diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index e3a8186094f9e..ac0de9dba1d49 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -280,8 +280,17 @@ class Partition(val topic: String, leaderEpoch = partitionStateInfo.basePartitionState.leaderEpoch leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset) zkVersion = partitionStateInfo.basePartitionState.zkVersion - val isNewLeader = leaderReplicaIdOpt.map(_ != localBrokerId).getOrElse(true) + // In the case of successive leader elections in a short time period, a follower may have + // entries in its log from a later epoch than any entry in the new leader's log. In order + // to ensure that these followers can truncate to the right offset, we must cache the new + // leader epoch and the start offset since it should be larger than any epoch that a follower + // would try to query. + leaderReplica.epochs.foreach { epochCache => + epochCache.assign(leaderEpoch, leaderEpochStartOffset) + } + + val isNewLeader = !leaderReplicaIdOpt.contains(localBrokerId) val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset val curTimeMs = time.milliseconds // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset. diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala index 4b65e439e2c74..462f1f3cc23da 100644 --- a/core/src/main/scala/kafka/cluster/Replica.scala +++ b/core/src/main/scala/kafka/cluster/Replica.scala @@ -18,6 +18,7 @@ package kafka.cluster import kafka.log.Log +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils.Logging import kafka.server.{LogOffsetMetadata, LogReadResult} import kafka.common.KafkaException @@ -55,7 +56,7 @@ class Replica(val brokerId: Int, def lastCaughtUpTimeMs = _lastCaughtUpTimeMs - val epochs = log.map(_.leaderEpochCache) + val epochs: Option[LeaderEpochFileCache] = log.map(_.leaderEpochCache) info(s"Replica loaded for partition $topicPartition with initial high watermark $initialHighWatermarkValue") log.foreach(_.onHighWatermarkIncremented(initialHighWatermarkValue)) diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 9b423ba593354..eeb569a077123 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -39,7 +39,7 @@ import com.yammer.metrics.core.Gauge import org.apache.kafka.common.utils.{Time, Utils} import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec} import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile} -import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache} +import kafka.server.epoch.LeaderEpochFileCache import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction import java.util.Map.{Entry => JEntry} @@ -208,7 +208,7 @@ class Log(@volatile var dir: File, /* the actual segments of the log */ private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment] - @volatile private var _leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache() + @volatile private var _leaderEpochCache: LeaderEpochFileCache = initializeLeaderEpochCache() locally { val startMs = time.milliseconds @@ -218,12 +218,12 @@ class Log(@volatile var dir: File, /* Calculate the offset of the next message */ nextOffsetMetadata = new LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) - _leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset) + _leaderEpochCache.truncateFromEnd(nextOffsetMetadata.messageOffset) logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset) // The earliest leader epoch may not be flushed during a hard failure. Recover it here. - _leaderEpochCache.clearAndFlushEarliest(logStartOffset) + _leaderEpochCache.truncateFromStart(logStartOffset) loadProducerState(logEndOffset, reloadFromCleanShutdown = hasCleanShutdownFile) @@ -271,11 +271,11 @@ class Log(@volatile var dir: File, def leaderEpochCache = _leaderEpochCache - private def initializeLeaderEpochCache(): LeaderEpochCache = { + private def initializeLeaderEpochCache(): LeaderEpochFileCache = { // create the log directory if it doesn't exist Files.createDirectories(dir.toPath) - new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata, - new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel)) + val checkpointFile = new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir), logDirFailureChannel) + new LeaderEpochFileCache(topicPartition, logEndOffset _, checkpointFile) } private def removeTempFilesAndCollectSwapFiles(): Set[File] = { @@ -352,7 +352,7 @@ class Log(@volatile var dir: File, } } - private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochCache] = None): Int = lock synchronized { + private def recoverSegment(segment: LogSegment, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = lock synchronized { val stateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) stateManager.truncateAndReload(logStartOffset, segment.baseOffset, time.milliseconds) logSegments(stateManager.mapEndOffset, segment.baseOffset).foreach { segment => @@ -830,7 +830,7 @@ class Log(@volatile var dir: File, if (newLogStartOffset > logStartOffset) { info(s"Incrementing log start offset to $newLogStartOffset") logStartOffset = newLogStartOffset - _leaderEpochCache.clearAndFlushEarliest(logStartOffset) + _leaderEpochCache.truncateFromStart(logStartOffset) producerStateManager.truncateHead(logStartOffset) updateFirstUnstableOffset() } @@ -1513,7 +1513,7 @@ class Log(@volatile var dir: File, updateLogEndOffset(targetOffset) this.recoveryPoint = math.min(targetOffset, this.recoveryPoint) this.logStartOffset = math.min(targetOffset, this.logStartOffset) - _leaderEpochCache.clearAndFlushLatest(targetOffset) + _leaderEpochCache.truncateFromEnd(targetOffset) loadProducerState(targetOffset, reloadFromCleanShutdown = false) } true diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 5970f42f6d9b6..012494694deea 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -22,7 +22,7 @@ import java.nio.file.attribute.FileTime import java.util.concurrent.TimeUnit import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.{FetchDataInfo, LogOffsetMetadata} import kafka.utils._ import org.apache.kafka.common.errors.CorruptRecordException @@ -265,7 +265,7 @@ class LogSegment private[log] (val log: FileRecords, * @return The number of bytes truncated from the log */ @nonthreadsafe - def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochCache] = None): Int = { + def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = { offsetIndex.reset() timeIndex.reset() txnIndex.reset() @@ -293,7 +293,7 @@ class LogSegment private[log] (val log: FileRecords, if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { leaderEpochCache.foreach { cache => - if (batch.partitionLeaderEpoch > cache.latestEpoch()) // this is to avoid unnecessary warning in cache.assign() + if (batch.partitionLeaderEpoch > cache.latestEpoch) // this is to avoid unnecessary warning in cache.assign() cache.assign(batch.partitionLeaderEpoch, batch.baseOffset) } updateProducerState(producerStateManager, batch) diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala index 74ef3e848ed27..9b5c1bbff7d18 100644 --- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala @@ -29,7 +29,7 @@ import ReplicaAlterLogDirsThread.FetchRequest import ReplicaAlterLogDirsThread.PartitionData import kafka.api.Request import kafka.server.QuotaFactory.UnboundedQuota -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.{FileRecords, MemoryRecords} @@ -58,7 +58,7 @@ class ReplicaAlterLogDirsThread(name: String, private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes private val fetchSize = brokerConfig.replicaFetchMaxBytes - private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get) + private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochFileCache] = replicaMgr.getReplica(tp).map(_.epochs.get) def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = { var partitionData: Seq[(TopicPartition, FetchResponse.PartitionData)] = null @@ -148,7 +148,7 @@ class ReplicaAlterLogDirsThread(name: String, val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty } - val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() } + val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch } ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet) } diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5e0e9bed3f52d..4fceef847e32d 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -23,7 +23,7 @@ import AbstractFetcherThread.ResultWithPartitions import kafka.api.{FetchRequest => _, _} import kafka.cluster.{BrokerEndPoint, Replica} import kafka.server.ReplicaFetcherThread._ -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import org.apache.kafka.clients.FetchSessionHandler import org.apache.kafka.common.requests.EpochEndOffset._ import org.apache.kafka.common.TopicPartition @@ -79,7 +79,7 @@ class ReplicaFetcherThread(name: String, private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2 private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id) - private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] = replicaMgr.getReplica(tp).map(_.epochs.get) + private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochFileCache] = replicaMgr.getReplica(tp).map(_.epochs.get) override def initiateShutdown(): Boolean = { val justShutdown = super.initiateShutdown() @@ -324,7 +324,7 @@ class ReplicaFetcherThread(name: String, val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (tp, epochCacheOpt) => epochCacheOpt.nonEmpty } debug(s"Build leaderEpoch request $partitionsWithEpoch") - val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() } + val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch } ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet) } diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala index 220432d32c0f9..10bdb1708c3a2 100644 --- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala +++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala @@ -18,53 +18,69 @@ package kafka.server.epoch import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.server.LogOffsetMetadata import kafka.server.checkpoints.LeaderEpochCheckpoint import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET} import kafka.utils.CoreUtils._ import kafka.utils.Logging import org.apache.kafka.common.TopicPartition -import scala.collection.mutable.ListBuffer -trait LeaderEpochCache { - def assign(leaderEpoch: Int, offset: Long) - def latestEpoch(): Int - def endOffsetFor(epoch: Int): Long - def clearAndFlushLatest(offset: Long) - def clearAndFlushEarliest(offset: Long) - def clearAndFlush() - def clear() -} +import scala.collection.mutable.ListBuffer /** - * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. - * - * Leader Epoch = epoch assigned to each leader by the controller. - * Offset = offset of the first message in each epoch. - * - * @param leo a function that determines the log end offset - * @param checkpoint the checkpoint file - */ -class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetMetadata, checkpoint: LeaderEpochCheckpoint) extends LeaderEpochCache with Logging { + * Represents a cache of (LeaderEpoch => Offset) mappings for a particular replica. + * + * Leader Epoch = epoch assigned to each leader by the controller. + * Offset = offset of the first message in each epoch. + * + * @param topicPartition the associated topic partition + * @param checkpoint the checkpoint file + * @param logEndOffset function to fetch the current log end offset + */ +class LeaderEpochFileCache(topicPartition: TopicPartition, + logEndOffset: () => Long, + checkpoint: LeaderEpochCheckpoint) extends Logging { + this.logIdent = s"[LeaderEpochCache $topicPartition] " + private val lock = new ReentrantReadWriteLock() private var epochs: ListBuffer[EpochEntry] = inWriteLock(lock) { ListBuffer(checkpoint.read(): _*) } /** * Assigns the supplied Leader Epoch to the supplied Offset * Once the epoch is assigned it cannot be reassigned - * - * @param epoch - * @param offset */ - override def assign(epoch: Int, offset: Long): Unit = { + def assign(epoch: Int, startOffset: Long): Unit = { inWriteLock(lock) { - if (epoch >= 0 && epoch > latestEpoch && offset >= latestOffset) { - info(s"Updated PartitionLeaderEpoch. ${epochChangeMsg(epoch, offset)}. Cache now contains ${epochs.size} entries.") - epochs += EpochEntry(epoch, offset) - flush() + val updateNeeded = if (epochs.isEmpty) { + true } else { - validateAndMaybeWarn(epoch, offset) + val lastEntry = epochs.last + lastEntry.epoch != epoch || startOffset < lastEntry.startOffset } + + if (updateNeeded) { + truncateAndAppend(EpochEntry(epoch, startOffset)) + flush() + } + } + } + + /** + * Remove any entries which violate monotonicity following the insertion of an assigned epoch. + */ + private def truncateAndAppend(entryToAppend: EpochEntry): Unit = { + validateAndMaybeWarn(entryToAppend) + + val (retainedEpochs, removedEpochs) = epochs.partition { entry => + entry.epoch < entryToAppend.epoch && entry.startOffset < entryToAppend.startOffset + } + + epochs = retainedEpochs :+ entryToAppend + + if (removedEpochs.isEmpty) { + debug(s"Appended new epoch entry $entryToAppend. Cache now contains ${epochs.size} entries.") + } else { + warn(s"New epoch entry $entryToAppend caused truncation of conflicting entries $removedEpochs. " + + s"Cache now contains ${epochs.size} entries.") } } @@ -74,7 +90,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * * @return */ - override def latestEpoch(): Int = { + def latestEpoch: Int = { inReadLock(lock) { if (epochs.isEmpty) UNDEFINED_EPOCH else epochs.last.epoch } @@ -90,42 +106,53 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * if requestedEpoch is < the first epoch cached, UNSUPPORTED_EPOCH_OFFSET will be returned * so that the follower falls back to High Water Mark. * - * @param requestedEpoch - * @return offset + * @param requestedEpoch requested leader epoch + * @return found end offset */ - override def endOffsetFor(requestedEpoch: Int): Long = { + def endOffsetFor(requestedEpoch: Int): Long = { inReadLock(lock) { - val offset = + val endOffset = if (requestedEpoch == UNDEFINED_EPOCH) { - // this may happen if a bootstrapping follower sends a request with undefined epoch or + // This may happen if a bootstrapping follower sends a request with undefined epoch or // a follower is on the older message format where leader epochs are not recorded UNDEFINED_EPOCH_OFFSET } else if (requestedEpoch == latestEpoch) { - leo().messageOffset + // For the leader, the latest epoch is always the current leader epoch that is still being written to. + // Followers should not have any reason to query for the end offset of the current epoch, but a consumer + // might if it is verifying its committed offset following a group rebalance. In this case, we return + // the current log end offset which makes the truncation check work as expected. + logEndOffset() } else { - val subsequentEpochs = epochs.filter(e => e.epoch > requestedEpoch) - if (subsequentEpochs.isEmpty || requestedEpoch < epochs.head.epoch) + val subsequentEpochs = epochs.filter { e => e.epoch > requestedEpoch} + if (subsequentEpochs.isEmpty) { + // The requested epoch is larger than any known epoch. This case should never be hit because + // the latest cached epoch is always the largest. UNDEFINED_EPOCH_OFFSET - else + } else { + // We have at least one previous epoch and one subsequent epoch. The result is the first + // prior epoch and the starting offset of the first subsequent epoch. subsequentEpochs.head.startOffset + } } - debug(s"Processed offset for epoch request for partition ${topicPartition} epoch:$requestedEpoch and returning offset $offset from epoch list of size ${epochs.size}") - offset + debug(s"Processed end offset request for epoch $requestedEpoch and returning end offset $endOffset " + + s"from epoch cache of size ${epochs.size}") + endOffset } } /** * Removes all epoch entries from the store with start offsets greater than or equal to the passed offset. - * - * @param offset */ - override def clearAndFlushLatest(offset: Long): Unit = { + def truncateFromEnd(endOffset: Long): Unit = { inWriteLock(lock) { - val before = epochs - if (offset >= 0 && offset <= latestOffset()) { - epochs = epochs.filter(entry => entry.startOffset < offset) + if (endOffset >= 0 && latestEntry.exists(_.startOffset >= endOffset)) { + val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset >= endOffset) + epochs = previousEntries + flush() - info(s"Cleared latest ${before.toSet.filterNot(epochs.toSet)} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition") + + debug(s"Cleared entries $subsequentEntries from epoch cache after " + + s"truncating to end offset $endOffset, leaving ${epochs.size} entries in the cache.") } } } @@ -136,20 +163,21 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM * * This method is exclusive: so clearEarliest(6) will retain an entry at offset 6. * - * @param offset the offset to clear up to + * @param startOffset the offset to clear up to */ - override def clearAndFlushEarliest(offset: Long): Unit = { + def truncateFromStart(startOffset: Long): Unit = { inWriteLock(lock) { - val before = epochs - if (offset >= 0 && earliestOffset() < offset) { - val earliest = epochs.filter(entry => entry.startOffset < offset) - if (earliest.nonEmpty) { - epochs = epochs --= earliest - //If the offset is less than the earliest offset remaining, add previous epoch back, but with an updated offset - if (offset < earliestOffset() || epochs.isEmpty) - new EpochEntry(earliest.last.epoch, offset) +=: epochs + if (epochs.nonEmpty) { + val (subsequentEntries, previousEntries) = epochs.partition(_.startOffset > startOffset) + + previousEntries.lastOption.foreach { firstBeforeStartOffset => + val updatedFirstEntry = EpochEntry(firstBeforeStartOffset.epoch, startOffset) + epochs = updatedFirstEntry +: subsequentEntries + flush() - info(s"Cleared earliest ${before.toSet.filterNot(epochs.toSet).size} entries from epoch cache based on passed offset $offset leaving ${epochs.size} in EpochFile for partition $topicPartition") + + debug(s"Cleared entries $previousEntries and rewrote first entry $updatedFirstEntry after " + + s"truncating to start offset $startOffset, leaving ${epochs.size} in the cache.") } } } @@ -158,47 +186,55 @@ class LeaderEpochFileCache(topicPartition: TopicPartition, leo: () => LogOffsetM /** * Delete all entries. */ - override def clearAndFlush() = { + def clearAndFlush() = { inWriteLock(lock) { epochs.clear() flush() } } - override def clear() = { + def clear() = { inWriteLock(lock) { epochs.clear() } } - def epochEntries(): ListBuffer[EpochEntry] = { + // Visible for testing + def epochEntries: ListBuffer[EpochEntry] = { epochs } - private def earliestOffset(): Long = { - if (epochs.isEmpty) -1 else epochs.head.startOffset - } - - private def latestOffset(): Long = { - if (epochs.isEmpty) -1 else epochs.last.startOffset - } + private def latestEntry: Option[EpochEntry] = epochs.lastOption private def flush(): Unit = { checkpoint.write(epochs) } - def epochChangeMsg(epoch: Int, offset: Long) = s"New: {epoch:$epoch, offset:$offset}, Current: {epoch:$latestEpoch, offset:$latestOffset} for Partition: $topicPartition" - - def validateAndMaybeWarn(epoch: Int, offset: Long) = { - assert(epoch >= 0, s"Received a PartitionLeaderEpoch assignment for an epoch < 0. This should not happen. ${epochChangeMsg(epoch, offset)}") - if (epoch < latestEpoch()) - warn(s"Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. " + - s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}") - else if (offset < latestOffset()) - warn(s"Received a PartitionLeaderEpoch assignment for an offset < latest offset for the most recent, stored PartitionLeaderEpoch. " + - s"This implies messages have arrived out of order. ${epochChangeMsg(epoch, offset)}") + private def validateAndMaybeWarn(entry: EpochEntry) = { + if (entry.epoch < 0) { + throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry") + } else { + // If the latest append violates the monotonicity of epochs or starting offsets, our choices + // are either to raise an error, ignore the append, or allow the append and truncate the + // conflicting entries from the cache. Raising an error risks killing the fetcher threads in + // pathological cases (i.e. cases we are not yet aware of). We instead take the final approach + // and assume that the latest append is always accurate. + + latestEntry.foreach { latest => + if (entry.epoch < latest.epoch) + warn(s"Received leader epoch assignment $entry which has an epoch less than the epoch " + + s"of the latest entry $latest. This implies messages have arrived out of order.") + else if (entry.startOffset < latest.startOffset) + warn(s"Received leader epoch assignment $entry which has a starting offset which is less than " + + s"the starting offset of the latest entry $latest. This implies messages have arrived out of order.") + } + } } } // Mapping of epoch to the first offset of the subsequent epoch -case class EpochEntry(epoch: Int, startOffset: Long) +case class EpochEntry(epoch: Int, startOffset: Long) { + override def toString: String = { + s"EpochEntry(epoch=$epoch, startOffset=$startOffset)" + } +} diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index cf679f635136b..e0dcdfdbb555e 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -29,7 +29,7 @@ import org.junit.Assert._ import org.junit.{After, Before, Test} import kafka.utils._ import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailureChannel} -import kafka.server.epoch.{EpochEntry, LeaderEpochCache, LeaderEpochFileCache} +import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention @@ -268,7 +268,7 @@ class LogTest { } override def recover(producerStateManager: ProducerStateManager, - leaderEpochCache: Option[LeaderEpochCache]): Int = { + leaderEpochCache: Option[LeaderEpochFileCache]): Int = { recoveredSegments += this super.recover(producerStateManager, leaderEpochCache) } @@ -2246,8 +2246,8 @@ class LogTest { log.onHighWatermarkIncremented(log.logEndOffset) log.deleteOldSegments() assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) - assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size) - assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries().head) + assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size) + assertEquals("Epoch entry should be the latest epoch and the leo.", EpochEntry(1, 100), epochCache(log).epochEntries.head) // append some messages to create some segments for (_ <- 0 until 100) @@ -2256,7 +2256,7 @@ class LogTest { log.delete() assertEquals("The number of segments should be 0", 0, log.numberOfSegments) assertEquals("The number of deleted segments should be zero.", 0, log.deleteOldSegments()) - assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size) + assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size) } @Test @@ -2269,12 +2269,12 @@ class LogTest { log.appendAsLeader(createRecords, leaderEpoch = 0) assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments) - assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size) + assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries.size) log.close() log.delete() assertEquals("The number of segments should be 0", 0, log.numberOfSegments) - assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size) + assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries.size) } @Test @@ -2447,7 +2447,7 @@ class LogTest { for (i <- records.indices) log.appendAsFollower(recordsForEpoch(i)) - assertEquals(42, log.leaderEpochCache.asInstanceOf[LeaderEpochFileCache].latestEpoch()) + assertEquals(42, log.leaderEpochCache.latestEpoch) } @Test @@ -2502,19 +2502,24 @@ class LogTest { @Test def shouldTruncateLeaderEpochFileWhenTruncatingLog() { - def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds) - val logConfig = createLogConfig(segmentBytes = 10 * createRecords.sizeInBytes) + def createRecords(startOffset: Long, epoch: Int): MemoryRecords = { + TestUtils.records(Seq(new SimpleRecord("value".getBytes)), + baseOffset = startOffset, partitionLeaderEpoch = epoch) + } + + val logConfig = createLogConfig(segmentBytes = 10 * createRecords(0, 0).sizeInBytes) val log = createLog(logDir, logConfig) val cache = epochCache(log) - //Given 2 segments, 10 messages per segment - for (epoch <- 1 to 20) - log.appendAsLeader(createRecords, leaderEpoch = 0) + def append(epoch: Int, startOffset: Long, count: Int): Unit = { + for (i <- 0 until count) + log.appendAsFollower(createRecords(startOffset + i, epoch)) + } - //Simulate some leader changes at specific offsets - cache.assign(0, 0) - cache.assign(1, 10) - cache.assign(2, 16) + //Given 2 segments, 10 messages per segment + append(epoch = 0, startOffset = 0, count = 10) + append(epoch = 1, startOffset = 10, count = 6) + append(epoch = 2, startOffset = 16, count = 4) assertEquals(2, log.numberOfSegments) assertEquals(20, log.logEndOffset) @@ -2566,7 +2571,7 @@ class LogTest { assertEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) // deliberately remove some of the epoch entries - leaderEpochCache.clearAndFlushLatest(2) + leaderEpochCache.truncateFromEnd(2) assertNotEquals(ListBuffer(EpochEntry(1, 0), EpochEntry(2, 1), EpochEntry(3, 3)), leaderEpochCache.epochEntries) log.close() diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala index 8212ed680c5ba..cb914c43344ee 100644 --- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala +++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala @@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean import kafka.cluster.{Partition, Replica} import kafka.log.Log -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.utils._ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.metrics.Metrics @@ -218,7 +218,7 @@ class IsrExpirationTest { private def logMock: Log = { val log = EasyMock.createMock(classOf[kafka.log.Log]) - val cache = EasyMock.createNiceMock(classOf[LeaderEpochCache]) + val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache]) EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes() EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes() EasyMock.expect(log.onHighWatermarkIncremented(0L)) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala index 2074044d0e129..1f4d04b7cb475 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala @@ -19,8 +19,8 @@ package kafka.server import kafka.cluster.{BrokerEndPoint, Replica} import kafka.log.LogManager import kafka.cluster.Partition -import kafka.server.epoch.LeaderEpochCache import org.apache.kafka.common.requests.EpochEndOffset._ +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition @@ -109,7 +109,7 @@ class ReplicaFetcherThreadTest { //Setup all dependencies val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -167,7 +167,7 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createMock(classOf[LeaderEpochCache]) + val leaderEpochs = createMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -213,7 +213,7 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -258,7 +258,7 @@ class ReplicaFetcherThreadTest { // Setup all the dependencies val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps) val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createMock(classOf[kafka.log.LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -314,7 +314,7 @@ class ReplicaFetcherThreadTest { //Setup all stubs val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createNiceMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) @@ -362,7 +362,7 @@ class ReplicaFetcherThreadTest { //Setup all stubs val quota = createNiceMock(classOf[ReplicationQuotaManager]) - val leaderEpochs = createNiceMock(classOf[LeaderEpochCache]) + val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache]) val logManager = createNiceMock(classOf[LogManager]) val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager]) val replica = createNiceMock(classOf[Replica]) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 3be33a2738c39..01ba4b03694d4 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -26,7 +26,7 @@ import kafka.log.{Log, LogConfig, LogManager, ProducerStateManager} import kafka.utils.{MockScheduler, MockTime, TestUtils} import TestUtils.createBroker import kafka.cluster.BrokerEndPoint -import kafka.server.epoch.LeaderEpochCache +import kafka.server.epoch.LeaderEpochFileCache import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend import kafka.utils.timer.MockTimer import kafka.zk.KafkaZkClient @@ -624,8 +624,8 @@ class ReplicaManagerTest { val mockScheduler = new MockScheduler(time) val mockBrokerTopicStats = new BrokerTopicStats val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size) - val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochCache]) - EasyMock.expect(mockLeaderEpochCache.latestEpoch()).andReturn(leaderEpochFromLeader) + val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache]) + EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader) EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader)) .andReturn(localLogOffset) EasyMock.replay(mockLeaderEpochCache) @@ -644,7 +644,7 @@ class ReplicaManagerTest { new File(new File(config.logDirs.head), s"$topic-$topicPartition"), 30000), logDirFailureChannel = mockLogDirFailureChannel) { - override def leaderEpochCache: LeaderEpochCache = mockLeaderEpochCache + override def leaderEpochCache: LeaderEpochFileCache = mockLeaderEpochCache override def logEndOffsetMetadata = LogOffsetMetadata(localLogOffset) } diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala index e7c6a9785bce6..0c47f15a09ff2 100644 --- a/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala +++ b/core/src/test/scala/unit/kafka/server/checkpoints/LeaderEpochCheckpointFileTest.scala @@ -24,7 +24,6 @@ import org.junit.Assert._ import org.junit.Test import org.scalatest.junit.JUnitSuite - class LeaderEpochCheckpointFileTest extends JUnitSuite with Logging{ @Test diff --git a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala index 6288d8faf1d94..bd87bc2951ff6 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/EpochDrivenReplicationProtocolAcceptanceTest.scala @@ -89,23 +89,23 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness assertEquals(0, latestRecord(follower).partitionLeaderEpoch()) //Both leader and follower should have recorded Epoch 0 at Offset 0 - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries) //Bounce the follower bounce(follower) awaitISR(tp) //Nothing happens yet as we haven't sent any new messages. - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0)), epochCache(follower).epochEntries) //Send a message producer.send(new ProducerRecord(topic, 0, null, msg)).get //Epoch1 should now propagate to the follower with the written message - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries) //The new message should have epoch 1 stamped assertEquals(1, latestRecord(leader).partitionLeaderEpoch()) @@ -116,8 +116,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness awaitISR(tp) //Epochs 2 should be added to the leader, but not on the follower (yet), as there has been no replication. - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1)), epochCache(follower).epochEntries) //Send a message producer.send(new ProducerRecord(topic, 0, null, msg)).get @@ -127,8 +127,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness assertEquals(2, latestRecord(follower).partitionLeaderEpoch()) //The leader epoch files should now match on leader and follower - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries()) - assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries()) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(leader).epochEntries) + assertEquals(Buffer(EpochEntry(0, 0), EpochEntry(1, 1), EpochEntry(2, 2)), epochCache(follower).epochEntries) } @Test @@ -300,8 +300,8 @@ class EpochDrivenReplicationProtocolAcceptanceTest extends ZooKeeperTestHarness private def log(leader: KafkaServer, follower: KafkaServer): Unit = { info(s"Bounce complete for follower ${follower.config.brokerId}") - info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries()) - info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries()) + info(s"Leader: leo${leader.config.brokerId}: " + getLog(leader, 0).logEndOffset + " cache: " + epochCache(leader).epochEntries) + info(s"Follower: leo${follower.config.brokerId}: " + getLog(follower, 0).logEndOffset + " cache: " + epochCache(follower).epochEntries) } private def waitForLogsToMatch(b1: KafkaServer, b2: KafkaServer, partition: Int = 0): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala index 4a8df11f8a367..6cd08c7dd7f8b 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala @@ -16,6 +16,7 @@ */ package kafka.server.epoch + import java.io.File import kafka.server.LogOffsetMetadata @@ -24,7 +25,7 @@ import org.apache.kafka.common.requests.EpochEndOffset.{UNDEFINED_EPOCH, UNDEFIN import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.junit.Assert._ -import org.junit.{Before, Test} +import org.junit.Test import scala.collection.mutable.ListBuffer @@ -33,51 +34,42 @@ import scala.collection.mutable.ListBuffer */ class LeaderEpochFileCacheTest { val tp = new TopicPartition("TestTopic", 5) - var checkpoint: LeaderEpochCheckpoint = _ + private var logEndOffset = 0L + private val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint { + private var epochs: Seq[EpochEntry] = Seq() + override def write(epochs: Seq[EpochEntry]): Unit = this.epochs = epochs + override def read(): Seq[EpochEntry] = this.epochs + } + private val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) @Test def shouldAddEpochAndMessageOffsetToCache() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 2, offset = 10) - leo = 11 + cache.assign(epoch = 2, startOffset = 10) + logEndOffset = 11 //Then - assertEquals(2, cache.latestEpoch()) - assertEquals(EpochEntry(2, 10), cache.epochEntries()(0)) - assertEquals(11, cache.endOffsetFor(2)) //should match leo + assertEquals(2, cache.latestEpoch) + assertEquals(EpochEntry(2, 10), cache.epochEntries(0)) + assertEquals(logEndOffset, cache.endOffsetFor(2)) //should match logEndOffset } @Test def shouldReturnLogEndOffsetIfLatestEpochRequested() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When just one epoch - cache.assign(epoch = 2, offset = 11) - cache.assign(epoch = 2, offset = 12) - leo = 14 + cache.assign(epoch = 2, startOffset = 11) + cache.assign(epoch = 2, startOffset = 12) + logEndOffset = 14 //Then - assertEquals(14, cache.endOffsetFor(2)) + assertEquals(logEndOffset, cache.endOffsetFor(2)) } @Test def shouldReturnUndefinedOffsetIfUndefinedEpochRequested() = { - def leoFinder() = new LogOffsetMetadata(0) - - //Given cache with some data on leader - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 11) - cache.assign(epoch = 3, offset = 12) + // assign couple of epochs + cache.assign(epoch = 2, startOffset = 11) + cache.assign(epoch = 3, startOffset = 12) //When (say a bootstraping follower) sends request for UNDEFINED_EPOCH val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) @@ -88,68 +80,51 @@ class LeaderEpochFileCacheTest { @Test def shouldNotOverwriteLogEndOffsetForALeaderEpochOnceItHasBeenAssigned() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - //Given - leo = 9 - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + logEndOffset = 9 - cache.assign(2, leo) + cache.assign(2, logEndOffset) //When called again later cache.assign(2, 10) //Then the offset should NOT have been updated - assertEquals(leo, cache.epochEntries()(0).startOffset) + assertEquals(logEndOffset, cache.epochEntries(0).startOffset) + assertEquals(ListBuffer(EpochEntry(2, 9)), cache.epochEntries) } @Test - def shouldAllowLeaderEpochToChangeEvenIfOffsetDoesNot() = { - def leoFinder() = new LogOffsetMetadata(0) - + def shouldEnforceMonotonicallyIncreasingStartOffsets() = { //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) cache.assign(2, 9) //When update epoch new epoch but same offset cache.assign(3, 9) //Then epoch should have been updated - assertEquals(ListBuffer(EpochEntry(2, 9), EpochEntry(3, 9)), cache.epochEntries()) + assertEquals(ListBuffer(EpochEntry(3, 9)), cache.epochEntries) } @Test def shouldNotOverwriteOffsetForALeaderEpochOnceItHasBeenAssigned() = { - //Given - val cache = new LeaderEpochFileCache(tp, () => new LogOffsetMetadata(0), checkpoint) cache.assign(2, 6) //When called again later with a greater offset cache.assign(2, 10) //Then later update should have been ignored - assertEquals(6, cache.epochEntries()(0).startOffset) + assertEquals(6, cache.epochEntries(0).startOffset) } @Test def shouldReturnUnsupportedIfNoEpochRecorded(){ - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(0)) } @Test def shouldReturnUnsupportedIfNoEpochRecordedAndUndefinedEpochRequested(){ - val leo = 73 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + logEndOffset = 73 //When (say a follower on older message format version) sends request for UNDEFINED_EPOCH val offsetFor = cache.endOffsetFor(UNDEFINED_EPOCH) @@ -159,39 +134,41 @@ class LeaderEpochFileCacheTest { } @Test - def shouldReturnUnsupportedIfRequestedEpochLessThanFirstEpoch(){ - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - - cache.assign(epoch = 5, offset = 11) - cache.assign(epoch = 6, offset = 12) - cache.assign(epoch = 7, offset = 13) + def shouldReturnFirstEpochIfRequestedEpochLessThanFirstEpoch(){ + cache.assign(epoch = 5, startOffset = 11) + cache.assign(epoch = 6, startOffset = 12) + cache.assign(epoch = 7, startOffset = 13) //When - val offset = cache.endOffsetFor(5 - 1) + val endOffset = cache.endOffsetFor(4) //Then - assertEquals(UNDEFINED_EPOCH_OFFSET, offset) + assertEquals(11, endOffset) } @Test - def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) + def shouldTruncateIfMatchingEpochButEarlierStartingOffset(): Unit = { + cache.assign(epoch = 5, startOffset = 11) + cache.assign(epoch = 6, startOffset = 12) + cache.assign(epoch = 7, startOffset = 13) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + // epoch 7 starts at an earlier offset + cache.assign(epoch = 7, startOffset = 12) + assertEquals(12, cache.endOffsetFor(5)) + assertEquals(12, cache.endOffsetFor(6)) + } + + @Test + def shouldGetFirstOffsetOfSubsequentEpochWhenOffsetRequestedForPreviousEpoch() = { //When several epochs - cache.assign(epoch = 1, offset = 11) - cache.assign(epoch = 1, offset = 12) - cache.assign(epoch = 2, offset = 13) - cache.assign(epoch = 2, offset = 14) - cache.assign(epoch = 3, offset = 15) - cache.assign(epoch = 3, offset = 16) - leo = 17 + cache.assign(epoch = 1, startOffset = 11) + cache.assign(epoch = 1, startOffset = 12) + cache.assign(epoch = 2, startOffset = 13) + cache.assign(epoch = 2, startOffset = 14) + cache.assign(epoch = 3, startOffset = 15) + cache.assign(epoch = 3, startOffset = 16) + logEndOffset = 17 //Then get the start offset of the next epoch assertEquals(15, cache.endOffsetFor(2)) @@ -199,15 +176,10 @@ class LeaderEpochFileCacheTest { @Test def shouldReturnNextAvailableEpochIfThereIsNoExactEpochForTheOneRequested(){ - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 0, offset = 10) - cache.assign(epoch = 2, offset = 13) - cache.assign(epoch = 4, offset = 17) + cache.assign(epoch = 0, startOffset = 10) + cache.assign(epoch = 2, startOffset = 13) + cache.assign(epoch = 4, startOffset = 17) //Then assertEquals(13, cache.endOffsetFor(requestedEpoch = 1)) @@ -216,14 +188,9 @@ class LeaderEpochFileCacheTest { @Test def shouldNotUpdateEpochAndStartOffsetIfItDidNotChange() = { - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 2, offset = 7) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 2, startOffset = 7) //Then assertEquals(1, cache.epochEntries.size) @@ -232,14 +199,10 @@ class LeaderEpochFileCacheTest { @Test def shouldReturnInvalidOffsetIfEpochIsRequestedWhichIsNotCurrentlyTracked(): Unit = { - val leo = 100 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) + logEndOffset = 100 //When - cache.assign(epoch = 2, offset = 100) + cache.assign(epoch = 2, startOffset = 100) //Then assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(3)) @@ -247,35 +210,28 @@ class LeaderEpochFileCacheTest { @Test def shouldSupportEpochsThatDoNotStartFromZero(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When - cache.assign(epoch = 2, offset = 6) - leo = 7 + cache.assign(epoch = 2, startOffset = 6) + logEndOffset = 7 //Then - assertEquals(leo, cache.endOffsetFor(2)) + assertEquals(logEndOffset, cache.endOffsetFor(2)) assertEquals(1, cache.epochEntries.size) - assertEquals(EpochEntry(2, 6), cache.epochEntries()(0)) + assertEquals(EpochEntry(2, 6), cache.epochEntries(0)) } @Test def shouldPersistEpochsBetweenInstances(){ - def leoFinder() = new LogOffsetMetadata(0) val checkpointPath = TestUtils.tempFile().getAbsolutePath - checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath)) + val checkpoint = new LeaderEpochCheckpointFile(new File(checkpointPath)) //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) + val cache = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint) + cache.assign(epoch = 2, startOffset = 6) //When val checkpoint2 = new LeaderEpochCheckpointFile(new File(checkpointPath)) - val cache2 = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint2) + val cache2 = new LeaderEpochFileCache(tp, logEndOffset _, checkpoint2) //Then assertEquals(1, cache2.epochEntries.size) @@ -283,81 +239,68 @@ class LeaderEpochFileCacheTest { } @Test - def shouldNotLetEpochGoBackwardsEvenIfMessageEpochsDo(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - + def shouldEnforceMonotonicallyIncreasingEpochs(): Unit = { //Given - cache.assign(epoch = 1, offset = 5); leo = 6 - cache.assign(epoch = 2, offset = 6); leo = 7 - - //When we update an epoch in the past with an earlier offset - cache.assign(epoch = 1, offset = 7); leo = 8 + cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6 + cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7 - //Then epoch should not be changed - assertEquals(2, cache.latestEpoch()) + //When we update an epoch in the past with a different offset, the log has already reached + //an inconsistent state. Our options are either to raise an error, ignore the new append, + //or truncate the cached epochs to the point of conflict. We take this latter approach in + //order to guarantee that epochs and offsets in the cache increase monotonically, which makes + //the search logic simpler to reason about. + cache.assign(epoch = 1, startOffset = 7); logEndOffset = 8 - //Then end offset for epoch 1 shouldn't have changed - assertEquals(6, cache.endOffsetFor(1)) + //Then later epochs will be removed + assertEquals(1, cache.latestEpoch) - //Then end offset for epoch 2 has to be the offset of the epoch 1 message (I can't thing of a better option) - assertEquals(8, cache.endOffsetFor(2)) + //Then end offset for epoch 1 will have changed + assertEquals(8, cache.endOffsetFor(1)) - //Epoch history shouldn't have changed - assertEquals(EpochEntry(1, 5), cache.epochEntries()(0)) - assertEquals(EpochEntry(2, 6), cache.epochEntries()(1)) + //Then end offset for epoch 2 is now undefined + assertEquals(UNDEFINED_EPOCH_OFFSET, cache.endOffsetFor(2)) + assertEquals(EpochEntry(1, 7), cache.epochEntries(0)) } @Test - def shouldNotLetOffsetsGoBackwardsEvenIfEpochsProgress() = { - def leoFinder() = new LogOffsetMetadata(0) - - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - + def shouldEnforceOffsetsIncreaseMonotonically() = { //When epoch goes forward but offset goes backwards - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 5) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 5) - //Then latter assign should be ignored - assertEquals(EpochEntry(2, 6), cache.epochEntries.toList(0)) + //The last assignment wins and the conflicting one is removed from the log + assertEquals(EpochEntry(3, 5), cache.epochEntries.toList(0)) } @Test def shouldIncreaseAndTrackEpochsAsLeadersChangeManyTimes(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 0, offset = 0) //leo=0 + cache.assign(epoch = 0, startOffset = 0) //logEndOffset=0 //When - cache.assign(epoch = 1, offset = 0) //leo=0 + cache.assign(epoch = 1, startOffset = 0) //logEndOffset=0 //Then epoch should go up - assertEquals(1, cache.latestEpoch()) + assertEquals(1, cache.latestEpoch) //offset for 1 should still be 0 assertEquals(0, cache.endOffsetFor(1)) //offset for epoch 0 should still be 0 assertEquals(0, cache.endOffsetFor(0)) //When we write 5 messages as epoch 1 - leo = 5 + logEndOffset = 5 - //Then end offset for epoch(1) should be leo => 5 + //Then end offset for epoch(1) should be logEndOffset => 5 assertEquals(5, cache.endOffsetFor(1)) //Epoch 0 should still be at offset 0 assertEquals(0, cache.endOffsetFor(0)) //When - cache.assign(epoch = 2, offset = 5) //leo=5 + cache.assign(epoch = 2, startOffset = 5) //logEndOffset=5 - leo = 10 //write another 5 messages + logEndOffset = 10 //write another 5 messages - //Then end offset for epoch(2) should be leo => 10 + //Then end offset for epoch(2) should be logEndOffset => 10 assertEquals(10, cache.endOffsetFor(2)) //end offset for epoch(1) should be the start offset of epoch(2) => 5 @@ -369,36 +312,30 @@ class LeaderEpochFileCacheTest { @Test def shouldIncreaseAndTrackEpochsAsFollowerReceivesManyMessages(): Unit = { - var leo = 0 - def leoFinder() = new LogOffsetMetadata(leo) - - //When new - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //When Messages come in - cache.assign(epoch = 0, offset = 0); leo = 1 - cache.assign(epoch = 0, offset = 1); leo = 2 - cache.assign(epoch = 0, offset = 2); leo = 3 + cache.assign(epoch = 0, startOffset = 0); logEndOffset = 1 + cache.assign(epoch = 0, startOffset = 1); logEndOffset = 2 + cache.assign(epoch = 0, startOffset = 2); logEndOffset = 3 //Then epoch should stay, offsets should grow - assertEquals(0, cache.latestEpoch()) - assertEquals(leo, cache.endOffsetFor(0)) + assertEquals(0, cache.latestEpoch) + assertEquals(logEndOffset, cache.endOffsetFor(0)) //When messages arrive with greater epoch - cache.assign(epoch = 1, offset = 3); leo = 4 - cache.assign(epoch = 1, offset = 4); leo = 5 - cache.assign(epoch = 1, offset = 5); leo = 6 + cache.assign(epoch = 1, startOffset = 3); logEndOffset = 4 + cache.assign(epoch = 1, startOffset = 4); logEndOffset = 5 + cache.assign(epoch = 1, startOffset = 5); logEndOffset = 6 - assertEquals(1, cache.latestEpoch()) - assertEquals(leo, cache.endOffsetFor(1)) + assertEquals(1, cache.latestEpoch) + assertEquals(logEndOffset, cache.endOffsetFor(1)) //When - cache.assign(epoch = 2, offset = 6); leo = 7 - cache.assign(epoch = 2, offset = 7); leo = 8 - cache.assign(epoch = 2, offset = 8); leo = 9 + cache.assign(epoch = 2, startOffset = 6); logEndOffset = 7 + cache.assign(epoch = 2, startOffset = 7); logEndOffset = 8 + cache.assign(epoch = 2, startOffset = 8); logEndOffset = 9 - assertEquals(2, cache.latestEpoch()) - assertEquals(leo, cache.endOffsetFor(2)) + assertEquals(2, cache.latestEpoch) + assertEquals(logEndOffset, cache.endOffsetFor(2)) //Older epochs should return the start offset of the first message in the subsequent epoch. assertEquals(3, cache.endOffsetFor(0)) @@ -407,16 +344,13 @@ class LeaderEpochFileCacheTest { @Test def shouldDropEntriesOnEpochBoundaryWhenRemovingLatestEntries(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When clear latest on epoch boundary - cache.clearAndFlushLatest(offset = 8) + cache.truncateFromEnd(endOffset = 8) //Then should remove two latest epochs (remove is inclusive) assertEquals(ListBuffer(EpochEntry(2, 6)), cache.epochEntries) @@ -424,16 +358,13 @@ class LeaderEpochFileCacheTest { @Test def shouldPreserveResetOffsetOnClearEarliestIfOneExists(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset ON epoch boundary - cache.clearAndFlushEarliest(offset = 8) + cache.truncateFromStart(startOffset = 8) //Then should preserve (3, 8) assertEquals(ListBuffer(EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -441,16 +372,13 @@ class LeaderEpochFileCacheTest { @Test def shouldUpdateSavedOffsetWhenOffsetToClearToIsBetweenEpochs(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset BETWEEN epoch boundaries - cache.clearAndFlushEarliest(offset = 9) + cache.truncateFromStart(startOffset = 9) //Then we should retain epoch 3, but update it's offset to 9 as 8 has been removed assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries) @@ -458,16 +386,13 @@ class LeaderEpochFileCacheTest { @Test def shouldNotClearAnythingIfOffsetToEarly(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset before first epoch offset - cache.clearAndFlushEarliest(offset = 1) + cache.truncateFromStart(startOffset = 1) //Then nothing should change assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -475,16 +400,13 @@ class LeaderEpochFileCacheTest { @Test def shouldNotClearAnythingIfOffsetToFirstOffset(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset on earliest epoch boundary - cache.clearAndFlushEarliest(offset = 6) + cache.truncateFromStart(startOffset = 6) //Then nothing should change assertEquals(ListBuffer(EpochEntry(2, 6),EpochEntry(3, 8), EpochEntry(4, 11)), cache.epochEntries) @@ -492,16 +414,13 @@ class LeaderEpochFileCacheTest { @Test def shouldRetainLatestEpochOnClearAllEarliest(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When - cache.clearAndFlushEarliest(offset = 11) + cache.truncateFromStart(startOffset = 11) //Then retain the last assertEquals(ListBuffer(EpochEntry(4, 11)), cache.epochEntries) @@ -509,16 +428,13 @@ class LeaderEpochFileCacheTest { @Test def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When we clear from a postition between offset 8 & offset 11 - cache.clearAndFlushEarliest(offset = 9) + cache.truncateFromStart(startOffset = 9) //Then we should update the middle epoch entry's offset assertEquals(ListBuffer(EpochEntry(3, 9), EpochEntry(4, 11)), cache.epochEntries) @@ -526,16 +442,13 @@ class LeaderEpochFileCacheTest { @Test def shouldUpdateOffsetBetweenEpochBoundariesOnClearEarliest2(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 0, offset = 0) - cache.assign(epoch = 1, offset = 7) - cache.assign(epoch = 2, offset = 10) + cache.assign(epoch = 0, startOffset = 0) + cache.assign(epoch = 1, startOffset = 7) + cache.assign(epoch = 2, startOffset = 10) //When we clear from a postition between offset 0 & offset 7 - cache.clearAndFlushEarliest(offset = 5) + cache.truncateFromStart(startOffset = 5) //Then we should keeep epoch 0 but update the offset appropriately assertEquals(ListBuffer(EpochEntry(0,5), EpochEntry(1, 7), EpochEntry(2, 10)), cache.epochEntries) @@ -543,16 +456,13 @@ class LeaderEpochFileCacheTest { @Test def shouldRetainLatestEpochOnClearAllEarliestAndUpdateItsOffset(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset beyond last epoch - cache.clearAndFlushEarliest(offset = 15) + cache.truncateFromStart(startOffset = 15) //Then update the last assertEquals(ListBuffer(EpochEntry(4, 15)), cache.epochEntries) @@ -560,51 +470,42 @@ class LeaderEpochFileCacheTest { @Test def shouldDropEntriesBetweenEpochBoundaryWhenRemovingNewest(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset BETWEEN epoch boundaries - cache.clearAndFlushLatest(offset = 9) + cache.truncateFromEnd(endOffset = 9) //Then should keep the preceding epochs - assertEquals(3, cache.latestEpoch()) + assertEquals(3, cache.latestEpoch) assertEquals(ListBuffer(EpochEntry(2, 6), EpochEntry(3, 8)), cache.epochEntries) } @Test def shouldClearAllEntries(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) - //When + //When cache.clearAndFlush() - //Then + //Then assertEquals(0, cache.epochEntries.size) } @Test def shouldNotResetEpochHistoryHeadIfUndefinedPassed(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset on epoch boundary - cache.clearAndFlushLatest(offset = UNDEFINED_EPOCH_OFFSET) + cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -612,16 +513,13 @@ class LeaderEpochFileCacheTest { @Test def shouldNotResetEpochHistoryTailIfUndefinedPassed(): Unit = { - def leoFinder() = new LogOffsetMetadata(0) - //Given - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - cache.assign(epoch = 2, offset = 6) - cache.assign(epoch = 3, offset = 8) - cache.assign(epoch = 4, offset = 11) + cache.assign(epoch = 2, startOffset = 6) + cache.assign(epoch = 3, startOffset = 8) + cache.assign(epoch = 4, startOffset = 11) //When reset to offset on epoch boundary - cache.clearAndFlushEarliest(offset = UNDEFINED_EPOCH_OFFSET) + cache.truncateFromEnd(endOffset = UNDEFINED_EPOCH_OFFSET) //Then should do nothing assertEquals(3, cache.epochEntries.size) @@ -629,54 +527,26 @@ class LeaderEpochFileCacheTest { @Test def shouldFetchLatestEpochOfEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then assertEquals(-1, cache.latestEpoch) } @Test def shouldFetchEndOffsetOfEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then assertEquals(-1, cache.endOffsetFor(7)) } @Test def shouldClearEarliestOnEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then - cache.clearAndFlushEarliest(7) + cache.truncateFromStart(7) } @Test def shouldClearLatestOnEmptyCache(): Unit = { - //Given - def leoFinder() = new LogOffsetMetadata(0) - - //When - val cache = new LeaderEpochFileCache(tp, () => leoFinder, checkpoint) - //Then - cache.clearAndFlushLatest(7) + cache.truncateFromEnd(7) } - @Before - def setUp() { - checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile()) - } } diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala index dc6ff9edc1a55..0d479fffe4a6a 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala @@ -37,9 +37,10 @@ import org.apache.kafka.common.requests.{EpochEndOffset, OffsetsForLeaderEpochRe import scala.collection.JavaConverters._ import scala.collection.Map +import scala.collection.mutable.ListBuffer class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { - var brokers: Seq[KafkaServer] = null + var brokers: ListBuffer[KafkaServer] = ListBuffer() val topic1 = "foo" val topic2 = "bar" val t1p0 = new TopicPartition(topic1, 0) @@ -60,7 +61,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @Test def shouldAddCurrentLeaderEpochToMessagesAsTheyAreWrittenToLeader() { - brokers = (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + brokers ++= (0 to 1).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } // Given two topics with replication of a single partition for (topic <- List(topic1, topic2)) { @@ -94,14 +95,13 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { def shouldSendLeaderEpochRequestAndGetAResponse(): Unit = { //3 brokers, put partition on 100/101 and then pretend to be 102 - brokers = (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } - adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic1, Map( - 0 -> Seq(100), - 1 -> Seq(101) - )) - adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic2, Map( - 0 -> Seq(100) - )) + brokers ++= (100 to 102).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + + val assignment1 = Map(0 -> Seq(100), 1 -> Seq(101)) + TestUtils.createTopic(zkClient, topic1, assignment1, brokers) + + val assignment2 = Map(0 -> Seq(100)) + TestUtils.createTopic(zkClient, topic2, assignment2, brokers) //Send messages equally to the two partitions, then half as many to a third producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 5, acks = -1) @@ -139,9 +139,12 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { @Test def shouldIncreaseLeaderEpochBetweenLeaderRestarts(): Unit = { - //Setup: we are only interested in the single partition on broker 101 - brokers = Seq(100, 101).map { id => createServer(fromProps(createBrokerConfig(id, zkConnect))) } + brokers += createServer(fromProps(createBrokerConfig(100, zkConnect))) + assertEquals(100, TestUtils.waitUntilControllerElected(zkClient)) + + brokers += createServer(fromProps(createBrokerConfig(101, zkConnect))) + def leo() = brokers(1).replicaManager.getReplica(tp).get.logEndOffset.messageOffset adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(tp.topic, Map(tp.partition -> Seq(101))) producer = createNewProducer(getBrokerListStrFromServers(brokers), retries = 10, acks = -1) @@ -151,10 +154,9 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { var fetcher = new TestFetcherThread(sender(brokers(0), brokers(1))) //Then epoch should be 0 and leo: 1 - var offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset() - assertEquals(1, offset) - assertEquals(leo(), offset) - + var epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp) + assertEquals(1, epochEndOffset.endOffset) + assertEquals(1, leo()) //2. When broker is bounced brokers(1).shutdown() @@ -163,15 +165,20 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get fetcher = new TestFetcherThread(sender(brokers(0), brokers(1))) - //Then epoch 0 should still be the start offset of epoch 1 - offset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset() - assertEquals(1, offset) + epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 0))(tp) + assertEquals(1, epochEndOffset.endOffset) - //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - This is because we have to first change leader to -1 and then change it again to the live replica) - assertEquals(2, fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset()) - assertEquals(leo(), fetcher.leaderOffsetsFor(Map(tp -> 2))(tp).endOffset()) + //No data written in epoch 1 + epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 1))(tp) + assertEquals(1, epochEndOffset.endOffset) + //Then epoch 2 should be the leo (NB: The leader epoch goes up in factors of 2 - + //This is because we have to first change leader to -1 and then change it again to the live replica) + //Note that the expected leader changes depend on the controller being on broker 100, which is not restarted + epochEndOffset = fetcher.leaderOffsetsFor(Map(tp -> 2))(tp) + assertEquals(2, epochEndOffset.endOffset) + assertEquals(2, leo()) //3. When broker is bounced again brokers(1).shutdown() @@ -180,7 +187,6 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging { producer.send(new ProducerRecord(tp.topic, tp.partition, null, "IHeartLogs".getBytes)).get fetcher = new TestFetcherThread(sender(brokers(0), brokers(1))) - //Then Epoch 0 should still map to offset 1 assertEquals(1, fetcher.leaderOffsetsFor(Map(tp -> 0))(tp).endOffset()) diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala index 1c01d622438eb..da1ebbecc14af 100644 --- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala +++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala @@ -47,7 +47,7 @@ class OffsetsForLeaderEpochTest { //Stubs val mockLog = createNiceMock(classOf[kafka.log.Log]) - val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochCache]) + val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache]) val logManager = createNiceMock(classOf[kafka.log.LogManager]) expect(mockCache.endOffsetFor(epochRequested)).andReturn(offset) expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes() diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 7eb5caf52698d..b425df86f5c89 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -377,10 +377,11 @@ object TestUtils extends Logging { producerId: Long = RecordBatch.NO_PRODUCER_ID, producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH, sequence: Int = RecordBatch.NO_SEQUENCE, - baseOffset: Long = 0L): MemoryRecords = { + baseOffset: Long = 0L, + partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = { val buf = ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records.asJava)) val builder = MemoryRecords.builder(buf, magicValue, codec, TimestampType.CREATE_TIME, baseOffset, - System.currentTimeMillis, producerId, producerEpoch, sequence) + System.currentTimeMillis, producerId, producerEpoch, sequence, false, partitionLeaderEpoch) records.foreach(builder.append) builder.build() }