diff --git a/bin/kafka-get-offsets.sh b/bin/kafka-get-offsets.sh index 993a202683309..b9e37b8890b4a 100755 --- a/bin/kafka-get-offsets.sh +++ b/bin/kafka-get-offsets.sh @@ -14,4 +14,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -exec $(dirname $0)/kafka-run-class.sh kafka.tools.GetOffsetShell "$@" +exec $(dirname $0)/kafka-run-class.sh org.apache.kafka.tools.GetOffsetShell "$@" diff --git a/bin/windows/kafka-get-offsets.bat b/bin/windows/kafka-get-offsets.bat index 08b8e27d70fec..89d16671071cd 100644 --- a/bin/windows/kafka-get-offsets.bat +++ b/bin/windows/kafka-get-offsets.bat @@ -14,4 +14,4 @@ rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. rem See the License for the specific language governing permissions and rem limitations under the License. -"%~dp0kafka-run-class.bat" kafka.tools.GetOffsetShell %* +"%~dp0kafka-run-class.bat" org.apache.kafka.tools.GetOffsetShell %* diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala deleted file mode 100644 index 1acbee6976d56..0000000000000 --- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package kafka.tools - -import joptsimple._ -import kafka.utils.{Exit, ToolsUtils} -import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ListTopicsOptions, OffsetSpec} -import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse} -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.util.CommandLineUtils -import org.apache.kafka.server.util.TopicFilter.IncludeList -import org.apache.kafka.server.util.TopicPartitionFilter -import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter -import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter -import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter -import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter -import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter - -import java.util.Properties -import java.util.concurrent.ExecutionException -import java.util.regex.Pattern -import scala.collection.Seq -import scala.jdk.CollectionConverters._ -import scala.math.Ordering.Implicits.infixOrderingOps - -object GetOffsetShell { - private val TopicPartitionPattern = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?") - - def main(args: Array[String]): Unit = { - try { - fetchOffsets(args) - } catch { - case e: Exception => - println(s"Error occurred: ${e.getMessage}") - Exit.exit(1, Some(e.getMessage)) - } - } - - private[tools] def fetchOffsets(args: Array[String]): Unit = { - val parser = new OptionParser(false) - val brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") - .withRequiredArg - .describedAs("HOST1:PORT1,...,HOST3:PORT3") - .ofType(classOf[String]) - val bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") - .requiredUnless("broker-list") - .withRequiredArg - .describedAs("HOST1:PORT1,...,HOST3:PORT3") - .ofType(classOf[String]) - val topicPartitionsOpt = parser.accepts("topic-partitions", s"Comma separated list of topic-partition patterns to get the offsets for, with the format of '$TopicPartitionPattern'." + - " The first group is an optional regex for the topic name, if omitted, it matches any topic name." + - " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.") - .withRequiredArg - .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3") - .ofType(classOf[String]) - val topicOpt = parser.accepts("topic", s"The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) - val partitionsOpt = parser.accepts("partitions", s"Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.") - .withRequiredArg - .describedAs("partition ids") - .ofType(classOf[String]) - val timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]") - .withRequiredArg - .describedAs(" / -1 or latest / -2 or earliest / -3 or max-timestamp") - .ofType(classOf[String]) - .defaultsTo("latest") - val commandConfigOpt = parser.accepts("command-config", s"Property file containing configs to be passed to Admin Client.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) - val excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", s"By default, internal topics are included. If specified, internal topics are excluded.") - - if (args.isEmpty) - CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets.") - - val options = parser.parse(args : _*) - - val effectiveBrokerListOpt = if (options.has(bootstrapServerOpt)) - bootstrapServerOpt - else - brokerListOpt - - CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt) - - val clientId = "GetOffsetShell" - val brokerList = options.valueOf(effectiveBrokerListOpt) - - ToolsUtils.validatePortOrDie(parser, brokerList) - val excludeInternalTopics = options.has(excludeInternalTopicsOpt) - - if (options.has(topicPartitionsOpt) && (options.has(topicOpt) || options.has(partitionsOpt))) { - throw new IllegalArgumentException("--topic-partitions cannot be used with --topic or --partitions") - } - - val offsetSpec = parseOffsetSpec(options.valueOf(timeOpt)) - - val topicPartitionFilter = if (options.has(topicPartitionsOpt)) { - createTopicPartitionFilterWithPatternList(options.valueOf(topicPartitionsOpt)) - } else { - createTopicPartitionFilterWithTopicAndPartitionPattern( - if (options.has(topicOpt)) Some(options.valueOf(topicOpt)) else None, - options.valueOf(partitionsOpt) - ) - } - - val config = if (options.has(commandConfigOpt)) - Utils.loadProps(options.valueOf(commandConfigOpt)) - else - new Properties - config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) - config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId) - val adminClient = Admin.create(config) - - try { - val partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics) - - if (partitionInfos.isEmpty) { - throw new IllegalArgumentException("Could not match any topic-partitions with the specified filters") - } - - val timestampsToSearch = partitionInfos.map(tp => tp -> offsetSpec).toMap.asJava - - val listOffsetsResult = adminClient.listOffsets(timestampsToSearch) - val partitionOffsets = partitionInfos.flatMap { tp => - try { - val partitionInfo = listOffsetsResult.partitionResult(tp).get - if (partitionInfo.offset != ListOffsetsResponse.UNKNOWN_OFFSET) { - Some((tp, partitionInfo.offset)) - } else { - None - } - } catch { - case e: ExecutionException => - e.getCause match { - case cause: KafkaException => - System.err.println(s"Skip getting offsets for topic-partition ${tp.topic}:${tp.partition} due to error: ${cause.getMessage}") - case _ => - throw e - } - None - } - } - - partitionOffsets.sortWith((tp1, tp2) => compareTopicPartitions(tp1._1, tp2._1)).foreach { - case (tp, offset) => println(s"${tp.topic}:${tp.partition}:${Option(offset).getOrElse("")}") - } - } finally { - adminClient.close() - } - } - - private def parseOffsetSpec(listOffsetsTimestamp: String): OffsetSpec = { - listOffsetsTimestamp match { - case "earliest" => OffsetSpec.earliest() - case "latest" => OffsetSpec.latest() - case "max-timestamp" => OffsetSpec.maxTimestamp() - case _ => - try { - listOffsetsTimestamp.toLong match { - case ListOffsetsRequest.EARLIEST_TIMESTAMP => OffsetSpec.earliest() - case ListOffsetsRequest.LATEST_TIMESTAMP => OffsetSpec.latest() - case ListOffsetsRequest.MAX_TIMESTAMP => OffsetSpec.maxTimestamp() - case value => OffsetSpec.forTimestamp(value) - } - } catch { - case e: NumberFormatException => - throw new IllegalArgumentException(s"Malformed time argument $listOffsetsTimestamp, please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp", e) - } - } - } - - def compareTopicPartitions(a: TopicPartition, b: TopicPartition): Boolean = { - (a.topic(), a.partition()) < (b.topic(), b.partition()) - } - - /** - * Creates a topic-partition filter based on a list of patterns. - * Expected format: - * List: TopicPartitionPattern(, TopicPartitionPattern)* - * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern - * TopicPattern: REGEX - * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER - */ - def createTopicPartitionFilterWithPatternList( - topicPartitions: String - ): TopicPartitionFilter = { - val ruleSpecs = topicPartitions.split(",") - val rules = ruleSpecs.toSeq.map(ruleSpec => parseRuleSpec(ruleSpec)) - new CompositeTopicPartitionFilter(rules.asJava) - } - - def parseRuleSpec(ruleSpec: String): TopicPartitionFilter = { - val matcher = TopicPartitionPattern.matcher(ruleSpec) - if (!matcher.matches()) - throw new IllegalArgumentException(s"Invalid rule specification: $ruleSpec") - - def group(group: Int): Option[String] = { - Option(matcher.group(group)).filter(s => s != null && s.nonEmpty) - } - - val topicFilter = new IncludeList(group(1).getOrElse(".*")) - val partitionFilter = group(2).map(_.toInt) match { - case Some(partition) => - new UniquePartitionFilter(partition) - case None => - val lowerRange = group(3).map(_.toInt).getOrElse(0) - val upperRange = group(4).map(_.toInt).getOrElse(Int.MaxValue) - new PartitionRangeFilter(lowerRange, upperRange) - } - new TopicFilterAndPartitionFilter( - topicFilter, - partitionFilter - ) - } - - /** - * Creates a topic-partition filter based on a topic pattern and a set of partition ids. - */ - def createTopicPartitionFilterWithTopicAndPartitionPattern( - topicOpt: Option[String], - partitionIds: String - ): TopicFilterAndPartitionFilter = { - new TopicFilterAndPartitionFilter( - new IncludeList(topicOpt.getOrElse(".*")), - new PartitionsSetFilter(createPartitionSet(partitionIds).asJava) - ) - } - - def createPartitionSet(partitionsString: String): Set[Integer] = { - if (partitionsString == null || partitionsString.isEmpty) - Set.empty - else - partitionsString.split(",").map { partitionString => - try Integer.valueOf(partitionString) - catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"--partitions expects a comma separated list of numeric " + - s"partition ids, but received: $partitionsString") - } - }.toSet - } - - /** - * Return the partition infos. Filter them with topicPartitionFilter. - */ - private def listPartitionInfos( - client: Admin, - topicPartitionFilter: TopicPartitionFilter, - excludeInternalTopics: Boolean - ): Seq[TopicPartition] = { - val listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics) - val topics = client.listTopics(listTopicsOptions).names.get - val filteredTopics = topics.asScala.filter(topicPartitionFilter.isTopicAllowed) - - client.describeTopics(filteredTopics.asJava).allTopicNames.get.asScala.flatMap { case (topic, description) => - description - .partitions - .asScala - .map(tp => new TopicPartition(topic, tp.partition)) - .filter(topicPartitionFilter.isTopicPartitionAllowed) - }.toBuffer - } -} - diff --git a/core/src/test/scala/kafka/tools/GetOffsetShellParsingTest.scala b/core/src/test/scala/kafka/tools/GetOffsetShellParsingTest.scala deleted file mode 100644 index 889631be19332..0000000000000 --- a/core/src/test/scala/kafka/tools/GetOffsetShellParsingTest.scala +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import org.apache.kafka.common.TopicPartition -import org.junit.jupiter.api.Assertions.{assertFalse, assertThrows, assertTrue} -import org.junit.jupiter.api.Test - -class GetOffsetShellParsingTest { - - @Test - def testTopicPartitionFilterForTopicName(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test") - - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertFalse(topicPartitionFilter.isTopicAllowed("test1")) - assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0))) - } - - @Test - def testTopicPartitionFilterForInternalTopicName(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("__consumer_offsets") - - assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - assertFalse(topicPartitionFilter.isTopicAllowed("test1")) - assertFalse(topicPartitionFilter.isTopicAllowed("test2")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 1))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0))) - } - - @Test - def testTopicPartitionFilterForTopicNameList(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test,test1,__consumer_offsets") - - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertTrue(topicPartitionFilter.isTopicAllowed("test1")) - assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - assertFalse(topicPartitionFilter.isTopicAllowed("test2")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0))) - } - - @Test - def testTopicPartitionFilterForRegex(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test.*") - - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertTrue(topicPartitionFilter.isTopicAllowed("test1")) - assertTrue(topicPartitionFilter.isTopicAllowed("test2")) - assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0))) - } - - @Test - def testTopicPartitionFilterForPartitionIndexSpec(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":0") - - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertTrue(topicPartitionFilter.isTopicAllowed("test1")) - assertTrue(topicPartitionFilter.isTopicAllowed("test2")) - assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 1))) - } - - @Test - def testTopicPartitionFilterForPartitionRangeSpec(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1-3") - - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertTrue(topicPartitionFilter.isTopicAllowed("test1")) - assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - assertTrue(topicPartitionFilter.isTopicAllowed("test2")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 2))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 3))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 3))) - } - - @Test - def testTopicPartitionFilterForPartitionLowerBoundSpec(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1-") - - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertTrue(topicPartitionFilter.isTopicAllowed("test1")) - assertTrue(topicPartitionFilter.isTopicAllowed("test2")) - assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 2))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0))) - } - - @Test - def testTopicPartitionFilterForPartitionUpperBoundSpec(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":-3") - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertTrue(topicPartitionFilter.isTopicAllowed("test1")) - assertTrue(topicPartitionFilter.isTopicAllowed("test2")) - assertTrue(topicPartitionFilter.isTopicAllowed("test3")) - assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test2", 2))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test3", 3))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 3))) - } - - @Test - def testTopicPartitionFilterComplex(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList("test.*:0,__consumer_offsets:1-2,.*:3") - - assertTrue(topicPartitionFilter.isTopicAllowed("test")) - assertTrue(topicPartitionFilter.isTopicAllowed("test1")) - assertTrue(topicPartitionFilter.isTopicAllowed("custom")) - assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test1", 1))) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("custom", 3))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("custom", 0))) - - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 3))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("__consumer_offsets", 2))) - } - - @Test - def testPartitionFilterForSingleIndex(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1") - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2))) - } - - @Test - def testPartitionFilterForRange(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":1-3") - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 3))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 4))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 5))) - } - - @Test - def testPartitionFilterForLowerBound(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":3-") - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 3))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 4))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 5))) - } - - @Test - def testPartitionFilterForUpperBound(): Unit = { - val topicPartitionFilter = GetOffsetShell.createTopicPartitionFilterWithPatternList(":-3") - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 0))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 1))) - assertTrue(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 2))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 3))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 4))) - assertFalse(topicPartitionFilter.isTopicPartitionAllowed(topicPartition("test", 5))) - } - - @Test - def testPartitionsSetFilter(): Unit = { - val partitionsSetFilter = GetOffsetShell.createTopicPartitionFilterWithTopicAndPartitionPattern(Some("topic"), "1,3,5") - - assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 0))) - assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 2))) - assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 4))) - - assertFalse(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic1", 1))) - assertFalse(partitionsSetFilter.isTopicAllowed("topic1")) - - assertTrue(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 1))) - assertTrue(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 3))) - assertTrue(partitionsSetFilter.isTopicPartitionAllowed(topicPartition("topic", 5))) - assertTrue(partitionsSetFilter.isTopicAllowed("topic")) - } - - @Test - def testPartitionFilterForInvalidSingleIndex(): Unit = { - assertThrows(classOf[IllegalArgumentException], - () => GetOffsetShell.createTopicPartitionFilterWithPatternList(":a")) - } - - @Test - def testPartitionFilterForInvalidRange(): Unit = { - assertThrows(classOf[IllegalArgumentException], - () => GetOffsetShell.createTopicPartitionFilterWithPatternList(":a-b")) - } - - @Test - def testPartitionFilterForInvalidLowerBound(): Unit = { - assertThrows(classOf[IllegalArgumentException], - () => GetOffsetShell.createTopicPartitionFilterWithPatternList(":a-")) - } - - @Test - def testPartitionFilterForInvalidUpperBound(): Unit = { - assertThrows(classOf[IllegalArgumentException], - () => GetOffsetShell.createTopicPartitionFilterWithPatternList(":-b")) - } - - @Test - def testInvalidTimeValue(): Unit = { - assertThrows(classOf[IllegalArgumentException], - () => GetOffsetShell.fetchOffsets(Array("--bootstrap-server", "localhost:9092", "--time", "invalid"))) - } - - private def topicPartition(topic: String, partition: Int): TopicPartition = { - new TopicPartition(topic, partition) - } -} diff --git a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala deleted file mode 100644 index cbce573192748..0000000000000 --- a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.tools - -import java.util.Properties -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.{Exit, Logging, TestUtils} -import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} -import org.apache.kafka.common.serialization.StringSerializer -import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} -import org.junit.jupiter.api.{BeforeEach, Test, TestInfo} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource - -class GetOffsetShellTest extends KafkaServerTestHarness with Logging { - private val topicCount = 4 - private val offsetTopicPartitionCount = 4 - - override def generateConfigs: collection.Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect) - .map { p => - p.put(KafkaConfig.OffsetsTopicPartitionsProp, Int.box(offsetTopicPartitionCount)) - p - }.map(KafkaConfig.fromProps) - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - super.setUp(testInfo) - Range(1, topicCount + 1).foreach(i => createTopic(topicName(i), i)) - - val props = new Properties() - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers()) - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer]) - - // Send X messages to each partition of topicX - val producer = new KafkaProducer[String, String](props) - Range(1, topicCount + 1).foreach(i => Range(0, i*i) - .foreach(msgCount => producer.send(new ProducerRecord[String, String](topicName(i), msgCount % i, null, "val" + msgCount)))) - producer.close() - - TestUtils.createOffsetsTopic(zkClient, servers) - } - - @Test - def testNoFilterOptions(): Unit = { - val offsets = executeAndParse(Array()) - assertEquals(expectedOffsetsWithInternal(), offsets) - } - - @Test - def testInternalExcluded(): Unit = { - val offsets = executeAndParse(Array("--exclude-internal-topics")) - assertEquals(expectedTestTopicOffsets(), offsets) - } - - @Test - def testTopicNameArg(): Unit = { - Range(1, topicCount + 1).foreach(i => { - val offsets = executeAndParse(Array("--topic", topicName(i))) - assertEquals(expectedOffsetsForTopic(i), offsets, () => "Offset output did not match for " + topicName(i)) - }) - } - - @Test - def testTopicPatternArg(): Unit = { - val offsets = executeAndParse(Array("--topic", "topic.*")) - assertEquals(expectedTestTopicOffsets(), offsets) - } - - @Test - def testPartitionsArg(): Unit = { - val offsets = executeAndParse(Array("--partitions", "0,1")) - assertEquals(expectedOffsetsWithInternal().filter { case (_, partition, _) => partition <= 1 }, offsets) - } - - @Test - def testTopicPatternArgWithPartitionsArg(): Unit = { - val offsets = executeAndParse(Array("--topic", "topic.*", "--partitions", "0,1")) - assertEquals(expectedTestTopicOffsets().filter { case (_, partition, _) => partition <= 1 }, offsets) - } - - @Test - def testTopicPartitionsArg(): Unit = { - val offsets = executeAndParse(Array("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3")) - assertEquals( - List( - ("__consumer_offsets", 3, Some(0)), - ("topic1", 0, Some(1)), - ("topic2", 1, Some(2)), - ("topic3", 2, Some(3)), - ("topic4", 2, Some(4)) - ), - offsets - ) - } - - @ParameterizedTest - @ValueSource(strings = Array("-1", "latest")) - def testGetLatestOffsets(time: String): Unit = { - val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) - assertEquals( - List( - ("topic1", 0, Some(1)), - ("topic2", 0, Some(2)), - ("topic3", 0, Some(3)), - ("topic4", 0, Some(4)) - ), - offsets - ) - } - - @ParameterizedTest - @ValueSource(strings = Array("-2", "earliest")) - def testGetEarliestOffsets(time: String): Unit = { - val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) - assertEquals( - List( - ("topic1", 0, Some(0)), - ("topic2", 0, Some(0)), - ("topic3", 0, Some(0)), - ("topic4", 0, Some(0)) - ), - offsets - ) - } - - @ParameterizedTest - @ValueSource(strings = Array("-3", "max-timestamp")) - def testGetOffsetsByMaxTimestamp(time: String): Unit = { - val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time)) - offsets.foreach { case (topic, _, timestampOpt) => - // We can't know the exact offsets with max timestamp - assertTrue(timestampOpt.get >= 0 && timestampOpt.get <= topic.replace("topic", "").toInt) - } - } - - @Test - def testGetOffsetsByTimestamp(): Unit = { - val time = (System.currentTimeMillis() / 2).toString - val offsets = executeAndParse(Array("--topic-partitions", "topic.*:0", "--time", time)) - assertEquals( - List( - ("topic1", 0, Some(0)), - ("topic2", 0, Some(0)), - ("topic3", 0, Some(0)), - ("topic4", 0, Some(0)) - ), - offsets - ) - } - - @Test - def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = { - val time = (System.currentTimeMillis() * 2).toString - val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time)) - assertEquals(List.empty, offsets) - } - - @Test - def testTopicPartitionsArgWithInternalExcluded(): Unit = { - val offsets = executeAndParse(Array("--topic-partitions", - "topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics")) - assertEquals( - List( - ("topic1", 0, Some(1)), - ("topic2", 1, Some(2)), - ("topic3", 2, Some(3)), - ("topic4", 2, Some(4)) - ), - offsets - ) - } - - @Test - def testTopicPartitionsArgWithInternalIncluded(): Unit = { - val offsets = executeAndParse(Array("--topic-partitions", "__.*:0")) - assertEquals(List(("__consumer_offsets", 0, Some(0))), offsets) - } - - @Test - def testTopicPartitionsNotFoundForNonExistentTopic(): Unit = { - assertExitCodeIsOne(Array("--topic", "some_nonexistent_topic")) - } - - @Test - def testTopicPartitionsNotFoundForExcludedInternalTopic(): Unit = { - assertExitCodeIsOne(Array("--topic", "some_nonexistent_topic:*")) - } - - @Test - def testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern(): Unit = { - assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets", "--exclude-internal-topics")) - } - - @Test - def testTopicPartitionsFlagWithTopicFlagCauseExit(): Unit = { - assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets", "--topic", "topic1")) - } - - @Test - def testTopicPartitionsFlagWithPartitionsFlagCauseExit(): Unit = { - assertExitCodeIsOne(Array("--topic-partitions", "__consumer_offsets", "--partitions", "0")) - } - - private def expectedOffsetsWithInternal(): List[(String, Int, Option[Long])] = { - Range(0, offsetTopicPartitionCount).map(i => ("__consumer_offsets", i, Some(0L))).toList ++ expectedTestTopicOffsets() - } - - private def expectedTestTopicOffsets(): List[(String, Int, Option[Long])] = { - Range(1, topicCount + 1).flatMap(i => expectedOffsetsForTopic(i)).toList - } - - private def expectedOffsetsForTopic(i: Int): List[(String, Int, Option[Long])] = { - val name = topicName(i) - Range(0, i).map(p => (name, p, Some(i.toLong))).toList - } - - private def topicName(i: Int): String = "topic" + i - - private def assertExitCodeIsOne(args: Array[String]): Unit = { - var exitStatus: Option[Int] = None - Exit.setExitProcedure { (status, _) => - exitStatus = Some(status) - throw new RuntimeException - } - - try { - GetOffsetShell.main(addBootstrapServer(args)) - } catch { - case e: RuntimeException => - } finally { - Exit.resetExitProcedure() - } - - assertEquals(Some(1), exitStatus) - } - - private def executeAndParse(args: Array[String]): List[(String, Int, Option[Long])] = { - val output = executeAndGrabOutput(args) - output.split(System.lineSeparator()) - .map(_.split(":")) - .filter(_.length >= 2) - .map { line => - val topic = line(0) - val partition = line(1).toInt - val timestamp = if (line.length == 2 || line(2).isEmpty) None else Some(line(2).toLong) - (topic, partition, timestamp) - } - .toList - } - - private def executeAndGrabOutput(args: Array[String]): String = { - TestUtils.grabConsoleOutput(GetOffsetShell.main(addBootstrapServer(args))) - } - - private def addBootstrapServer(args: Array[String]): Array[String] = { - args ++ Array("--bootstrap-server", bootstrapServers()) - } -} - - diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index e67bde8571ce2..ab48d1091d73b 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1777,8 +1777,7 @@ def get_offset_shell(self, time=None, topic=None, partitions=None, topic_partiti node = self.nodes[0] cmd = fix_opts_for_new_jvm(node) - cmd += self.path.script("kafka-run-class.sh", node) - cmd += " kafka.tools.GetOffsetShell" + cmd += self.path.script("kafka-get-offsets.sh", node) cmd += " --bootstrap-server %s" % self.bootstrap_servers(self.security_protocol) if time: diff --git a/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java new file mode 100644 index 0000000000000..99551ca0545a4 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import joptsimple.OptionException; +import joptsimple.OptionSpec; +import joptsimple.OptionSpecBuilder; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo; +import org.apache.kafka.clients.admin.ListTopicsOptions; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.PartitionFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionRangeFilter; +import org.apache.kafka.server.util.PartitionFilter.PartitionsSetFilter; +import org.apache.kafka.server.util.PartitionFilter.UniquePartitionFilter; +import org.apache.kafka.server.util.TopicFilter.IncludeList; +import org.apache.kafka.server.util.TopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.CompositeTopicPartitionFilter; +import org.apache.kafka.server.util.TopicPartitionFilter.TopicFilterAndPartitionFilter; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.function.IntFunction; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class GetOffsetShell { + private static final Pattern TOPIC_PARTITION_PATTERN = Pattern.compile("([^:,]*)(?::(?:([0-9]*)|(?:([0-9]*)-([0-9]*))))?"); + + public static void main(String... args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println("Error occurred: " + e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println("Error occurred: " + e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws IOException, ExecutionException, InterruptedException, TerseException { + GetOffsetShell getOffsetShell = new GetOffsetShell(); + + GetOffsetShellOptions options = new GetOffsetShellOptions(args); + + Map partitionOffsets = getOffsetShell.fetchOffsets(options); + + for (Map.Entry entry : partitionOffsets.entrySet()) { + TopicPartition topic = entry.getKey(); + + System.out.println(String.join(":", new String[]{topic.topic(), String.valueOf(topic.partition()), entry.getValue().toString()})); + } + } + + private static class GetOffsetShellOptions extends CommandDefaultOptions { + private final OptionSpec brokerListOpt; + private final OptionSpec bootstrapServerOpt; + private final OptionSpec topicPartitionsOpt; + private final OptionSpec topicOpt; + private final OptionSpec partitionsOpt; + private final OptionSpec timeOpt; + private final OptionSpec commandConfigOpt; + private final OptionSpec effectiveBrokerListOpt; + private final OptionSpecBuilder excludeInternalTopicsOpt; + + public GetOffsetShellOptions(String[] args) throws TerseException { + super(args); + + brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .withRequiredArg() + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(String.class); + bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") + .requiredUnless("broker-list") + .withRequiredArg() + .describedAs("HOST1:PORT1,...,HOST3:PORT3") + .ofType(String.class); + topicPartitionsOpt = parser.accepts("topic-partitions", "Comma separated list of topic-partition patterns to get the offsets for, with the format of '" + TOPIC_PARTITION_PATTERN + "'." + + " The first group is an optional regex for the topic name, if omitted, it matches any topic name." + + " The section after ':' describes a 'partition' pattern, which can be: a number, a range in the format of 'NUMBER-NUMBER' (lower inclusive, upper exclusive), an inclusive lower bound in the format of 'NUMBER-', an exclusive upper bound in the format of '-NUMBER' or may be omitted to accept all partitions.") + .withRequiredArg() + .describedAs("topic1:1,topic2:0-3,topic3,topic4:5-,topic5:-3") + .ofType(String.class); + topicOpt = parser.accepts("topic", "The topic to get the offsets for. It also accepts a regular expression. If not present, all authorized topics are queried. Cannot be used if --topic-partitions is present.") + .withRequiredArg() + .describedAs("topic") + .ofType(String.class); + partitionsOpt = parser.accepts("partitions", "Comma separated list of partition ids to get the offsets for. If not present, all partitions of the authorized topics are queried. Cannot be used if --topic-partitions is present.") + .withRequiredArg() + .describedAs("partition ids") + .ofType(String.class); + timeOpt = parser.accepts("time", "timestamp of the offsets before that. [Note: No offset is returned, if the timestamp greater than recently committed record timestamp is given.]") + .withRequiredArg() + .describedAs(" / -1 or latest / -2 or earliest / -3 or max-timestamp") + .ofType(String.class) + .defaultsTo("latest"); + commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") + .withRequiredArg() + .describedAs("config file") + .ofType(String.class); + excludeInternalTopicsOpt = parser.accepts("exclude-internal-topics", "By default, internal topics are included. If specified, internal topics are excluded."); + + if (args.length == 0) { + CommandLineUtils.printUsageAndExit(parser, "An interactive shell for getting topic-partition offsets."); + } + + try { + options = parser.parse(args); + } catch (OptionException e) { + throw new TerseException(e.getMessage()); + } + + if (options.has(bootstrapServerOpt)) { + effectiveBrokerListOpt = bootstrapServerOpt; + } else { + effectiveBrokerListOpt = brokerListOpt; + } + + CommandLineUtils.checkRequiredArgs(parser, options, effectiveBrokerListOpt); + + String brokerList = options.valueOf(effectiveBrokerListOpt); + + try { + ToolsUtils.validateBootstrapServer(brokerList); + } catch (IllegalArgumentException e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); + } + } + + public boolean hasTopicPartitionsOpt() { + return options.has(topicPartitionsOpt); + } + + public String topicPartitionsOpt() { + return options.valueOf(topicPartitionsOpt); + } + + public boolean hasTopicOpt() { + return options.has(topicOpt); + } + + public String topicOpt() { + return options.valueOf(topicOpt); + } + + public boolean hasPartitionsOpt() { + return options.has(partitionsOpt); + } + + public String partitionsOpt() { + return options.valueOf(partitionsOpt); + } + + public String timeOpt() { + return options.valueOf(timeOpt); + } + + public boolean hasCommandConfigOpt() { + return options.has(commandConfigOpt); + } + + public String commandConfigOpt() { + return options.valueOf(commandConfigOpt); + } + + public String effectiveBrokerListOpt() { + return options.valueOf(effectiveBrokerListOpt); + } + + public boolean hasExcludeInternalTopicsOpt() { + return options.has(excludeInternalTopicsOpt); + } + } + + public Map fetchOffsets(GetOffsetShellOptions options) throws IOException, ExecutionException, InterruptedException, TerseException { + String clientId = "GetOffsetShell"; + String brokerList = options.effectiveBrokerListOpt(); + + if (options.hasTopicPartitionsOpt() && (options.hasTopicOpt() || options.hasPartitionsOpt())) { + throw new TerseException("--topic-partitions cannot be used with --topic or --partitions"); + } + + boolean excludeInternalTopics = options.hasExcludeInternalTopicsOpt(); + OffsetSpec offsetSpec = parseOffsetSpec(options.timeOpt()); + + TopicPartitionFilter topicPartitionFilter; + + if (options.hasTopicPartitionsOpt()) { + topicPartitionFilter = createTopicPartitionFilterWithPatternList(options.topicPartitionsOpt()); + } else { + topicPartitionFilter = createTopicPartitionFilterWithTopicAndPartitionPattern(options.topicOpt(), options.partitionsOpt()); + } + + Properties config = options.hasCommandConfigOpt() ? Utils.loadProps(options.commandConfigOpt()) : new Properties(); + config.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); + config.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, clientId); + + try (Admin adminClient = Admin.create(config)) { + List partitionInfos = listPartitionInfos(adminClient, topicPartitionFilter, excludeInternalTopics); + + if (partitionInfos.isEmpty()) { + throw new TerseException("Could not match any topic-partitions with the specified filters"); + } + + Map timestampsToSearch = partitionInfos.stream().collect(Collectors.toMap(tp -> tp, tp -> offsetSpec)); + + ListOffsetsResult listOffsetsResult = adminClient.listOffsets(timestampsToSearch); + + TreeMap partitionOffsets = new TreeMap<>(Comparator.comparing(TopicPartition::toString)); + + for (TopicPartition partition : partitionInfos) { + ListOffsetsResultInfo partitionInfo; + + try { + partitionInfo = listOffsetsResult.partitionResult(partition).get(); + } catch (ExecutionException e) { + if (e.getCause() instanceof KafkaException) { + System.err.println("Skip getting offsets for topic-partition " + partition.toString() + " due to error: " + e.getMessage()); + } else { + throw e; + } + + continue; + } + + if (partitionInfo.offset() != ListOffsetsResponse.UNKNOWN_OFFSET) { + partitionOffsets.put(partition, partitionInfo.offset()); + } + } + + return partitionOffsets; + } + } + + private OffsetSpec parseOffsetSpec(String listOffsetsTimestamp) throws TerseException { + switch (listOffsetsTimestamp) { + case "earliest": + return OffsetSpec.earliest(); + case "latest": + return OffsetSpec.latest(); + case "max-timestamp": + return OffsetSpec.maxTimestamp(); + default: + long timestamp; + + try { + timestamp = Long.parseLong(listOffsetsTimestamp); + } catch (NumberFormatException e) { + throw new TerseException("Malformed time argument " + listOffsetsTimestamp + ". " + + "Please use -1 or latest / -2 or earliest / -3 or max-timestamp, or a specified long format timestamp"); + } + + if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) { + return OffsetSpec.earliest(); + } else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP) { + return OffsetSpec.latest(); + } else if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) { + return OffsetSpec.maxTimestamp(); + } else { + return OffsetSpec.forTimestamp(timestamp); + } + } + } + + /** + * Creates a topic-partition filter based on a list of patterns. + * Expected format: + * List: TopicPartitionPattern(, TopicPartitionPattern)* + * TopicPartitionPattern: TopicPattern(:PartitionPattern)? | :PartitionPattern + * TopicPattern: REGEX + * PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER + */ + public TopicPartitionFilter createTopicPartitionFilterWithPatternList(String topicPartitions) { + List ruleSpecs = Arrays.asList(topicPartitions.split(",")); + List rules = ruleSpecs.stream().map(ruleSpec -> { + try { + return parseRuleSpec(ruleSpec); + } catch (TerseException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + + return new CompositeTopicPartitionFilter(rules); + } + + /** + * Creates a topic-partition filter based on a topic pattern and a set of partition ids. + */ + public TopicPartitionFilter createTopicPartitionFilterWithTopicAndPartitionPattern(String topicOpt, String partitionIds) throws TerseException { + return new TopicFilterAndPartitionFilter( + new IncludeList(topicOpt != null ? topicOpt : ".*"), + new PartitionsSetFilter(createPartitionSet(partitionIds)) + ); + } + + private Set createPartitionSet(String partitionsString) throws TerseException { + Set partitions; + + if (partitionsString == null || partitionsString.isEmpty()) { + partitions = Collections.emptySet(); + } else { + try { + partitions = Arrays.stream(partitionsString.split(",")).map(Integer::parseInt).collect(Collectors.toSet()); + } catch (NumberFormatException e) { + throw new TerseException("--partitions expects a comma separated list of numeric " + + "partition ids, but received: " + partitionsString); + } + } + + return partitions; + } + + /** + * Return the partition infos. Filter them with topicPartitionFilter. + */ + private List listPartitionInfos( + Admin client, + TopicPartitionFilter topicPartitionFilter, + boolean excludeInternalTopics + ) throws ExecutionException, InterruptedException { + ListTopicsOptions listTopicsOptions = new ListTopicsOptions().listInternal(!excludeInternalTopics); + Set topics = client.listTopics(listTopicsOptions).names().get(); + Set filteredTopics = topics.stream().filter(topicPartitionFilter::isTopicAllowed).collect(Collectors.toSet()); + + return client.describeTopics(filteredTopics).allTopicNames().get().entrySet().stream().flatMap( + topic -> topic.getValue().partitions().stream().map( + tp -> new TopicPartition(topic.getKey(), tp.partition()) + ).filter(topicPartitionFilter::isTopicPartitionAllowed) + ).collect(Collectors.toList()); + } + + private TopicPartitionFilter parseRuleSpec(String ruleSpec) throws TerseException, RuntimeException { + Matcher matcher = TOPIC_PARTITION_PATTERN.matcher(ruleSpec); + + if (!matcher.matches()) + throw new TerseException("Invalid rule specification: " + ruleSpec); + + IntFunction group = (int g) -> (matcher.group(g) != null && !matcher.group(g).isEmpty()) ? matcher.group(g) : null; + + IncludeList topicFilter = group.apply(1) != null ? new IncludeList(group.apply(1)) : new IncludeList(".*"); + + PartitionFilter partitionFilter; + + if (group.apply(2) != null) { + partitionFilter = new UniquePartitionFilter(Integer.parseInt(group.apply(2))); + } else { + int lowerRange = group.apply(3) != null ? Integer.parseInt(group.apply(3)) : 0; + int upperRange = group.apply(4) != null ? Integer.parseInt(group.apply(4)) : Integer.MAX_VALUE; + + partitionFilter = new PartitionRangeFilter(lowerRange, upperRange); + } + + return new TopicPartitionFilter.TopicFilterAndPartitionFilter(topicFilter, partitionFilter); + } +} diff --git a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java index 446c5fd67bf9d..c3e1cc1f7a080 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.tools; -import joptsimple.OptionParser; import joptsimple.OptionSpec; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.ClientRequest; @@ -62,7 +61,6 @@ import java.net.SocketTimeoutException; import java.text.SimpleDateFormat; -import java.util.Arrays; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -312,33 +310,19 @@ private static class ReplicaVerificationToolOptions extends CommandDefaultOption } CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt); CommandLineUtils.checkInvalidArgs(parser, options, topicsIncludeOpt, topicWhiteListOpt); + } String brokerHostsAndPorts() { String brokerList = options.valueOf(brokerListOpt); - validateBrokerList(parser, brokerList); - return brokerList; - } - void validateBrokerList(OptionParser parser, String brokerList) { - if (parser == null || brokerList == null) { - throw new RuntimeException("No option parser or broker list found"); - } - if (brokerList.isEmpty()) { - CommandLineUtils.printUsageAndExit(parser, "Empty broker list option"); + try { + ToolsUtils.validateBootstrapServer(brokerList); + } catch (IllegalArgumentException e) { + CommandLineUtils.printUsageAndExit(parser, e.getMessage()); } - String[] hostPorts; - if (brokerList.contains(",")) hostPorts = brokerList.split(","); - else hostPorts = new String[]{brokerList}; - - String[] validHostPort = Arrays.stream(hostPorts) - .filter(hostPortData -> Utils.getPort(hostPortData) != null) - .toArray(String[]::new); - - if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) { - CommandLineUtils.printUsageAndExit(parser, "Invalid broker list option"); - } + return brokerList; } TopicFilter.IncludeList topicsIncludeFilter() { diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java index 1a391f33d673a..794f1022293c2 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java +++ b/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java @@ -18,8 +18,10 @@ import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.utils.Utils; import java.io.PrintStream; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -99,4 +101,26 @@ public static void prettyPrintTable( printRow(columnLengths, headers, out); rows.forEach(row -> printRow(columnLengths, row, out)); } + + public static void validateBootstrapServer(String hostPort) throws IllegalArgumentException { + if (hostPort == null || hostPort.trim().isEmpty()) { + throw new IllegalArgumentException("Error while validating the bootstrap address\n"); + } + + String[] hostPorts; + + if (hostPort.contains(",")) { + hostPorts = hostPort.split(","); + } else { + hostPorts = new String[] {hostPort}; + } + + String[] validHostPort = Arrays.stream(hostPorts) + .filter(hostPortData -> Utils.getPort(hostPortData) != null) + .toArray(String[]::new); + + if (validHostPort.length == 0 || validHostPort.length != hostPorts.length) { + throw new IllegalArgumentException("Please provide valid host:port like host1:9091,host2:9092\n"); + } + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java new file mode 100644 index 0000000000000..9980a471c37da --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellParsingTest.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.util.TopicPartitionFilter; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GetOffsetShellParsingTest { + GetOffsetShell getOffsetShell = new GetOffsetShell(); + + @Test + public void testTopicPartitionFilterForTopicName() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test"); + + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertFalse(topicPartitionFilter.isTopicAllowed("test1")); + assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0))); + } + + @Test + public void testTopicPartitionFilterForInternalTopicName() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("__consumer_offsets"); + + assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + assertFalse(topicPartitionFilter.isTopicAllowed("test1")); + assertFalse(topicPartitionFilter.isTopicAllowed("test2")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 1))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0))); + } + + + @Test + public void testTopicPartitionFilterForTopicNameList() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test,test1,__consumer_offsets"); + + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertTrue(topicPartitionFilter.isTopicAllowed("test1")); + assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + assertFalse(topicPartitionFilter.isTopicAllowed("test2")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0))); + } + + + @Test + public void testTopicPartitionFilterForRegex() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test.*"); + + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertTrue(topicPartitionFilter.isTopicAllowed("test1")); + assertTrue(topicPartitionFilter.isTopicAllowed("test2")); + assertFalse(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0))); + } + + @Test + public void testTopicPartitionFilterForPartitionIndexSpec() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":0"); + + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertTrue(topicPartitionFilter.isTopicAllowed("test1")); + assertTrue(topicPartitionFilter.isTopicAllowed("test2")); + assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 1))); + } + + @Test + public void testTopicPartitionFilterForPartitionRangeSpec() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1-3"); + + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertTrue(topicPartitionFilter.isTopicAllowed("test1")); + assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + assertTrue(topicPartitionFilter.isTopicAllowed("test2")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 2))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 3))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 3))); + } + + @Test + public void testTopicPartitionFilterForPartitionLowerBoundSpec() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1-"); + + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertTrue(topicPartitionFilter.isTopicAllowed("test1")); + assertTrue(topicPartitionFilter.isTopicAllowed("test2")); + assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 2))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0))); + } + + @Test + public void testTopicPartitionFilterForPartitionUpperBoundSpec() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":-3"); + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertTrue(topicPartitionFilter.isTopicAllowed("test1")); + assertTrue(topicPartitionFilter.isTopicAllowed("test2")); + assertTrue(topicPartitionFilter.isTopicAllowed("test3")); + assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test2", 2))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test3", 3))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 3))); + } + + @Test + public void testTopicPartitionFilterComplex() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList("test.*:0,__consumer_offsets:1-2,.*:3"); + + assertTrue(topicPartitionFilter.isTopicAllowed("test")); + assertTrue(topicPartitionFilter.isTopicAllowed("test1")); + assertTrue(topicPartitionFilter.isTopicAllowed("custom")); + assertTrue(topicPartitionFilter.isTopicAllowed("__consumer_offsets")); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test1", 1))); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("custom", 3))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("custom", 0))); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 3))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("__consumer_offsets", 2))); + } + + @Test + public void testPartitionFilterForSingleIndex() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1"); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2))); + } + + @Test + public void testPartitionFilterForRange() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":1-3"); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 3))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 4))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 5))); + } + + @Test + public void testPartitionFilterForLowerBound() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":3-"); + + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 3))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 4))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 5))); + } + + @Test + public void testPartitionFilterForUpperBound() { + TopicPartitionFilter topicPartitionFilter = getOffsetShell.createTopicPartitionFilterWithPatternList(":-3"); + + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 0))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 1))); + assertTrue(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 2))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 3))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 4))); + assertFalse(topicPartitionFilter.isTopicPartitionAllowed(getTopicPartition("test", 5))); + } + + @Test + public void testPartitionsSetFilter() throws TerseException { + TopicPartitionFilter partitionsSetFilter = getOffsetShell.createTopicPartitionFilterWithTopicAndPartitionPattern("topic", "1,3,5"); + + assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 0))); + assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 2))); + assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 4))); + + assertFalse(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic1", 1))); + assertFalse(partitionsSetFilter.isTopicAllowed("topic1")); + + assertTrue(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 1))); + assertTrue(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 3))); + assertTrue(partitionsSetFilter.isTopicPartitionAllowed(getTopicPartition("topic", 5))); + assertTrue(partitionsSetFilter.isTopicAllowed("topic")); + } + + @Test + public void testInvalidTimeValue() { + assertThrows(TerseException.class, () -> GetOffsetShell.execute("--bootstrap-server", "localhost:9092", "--time", "invalid")); + } + + private TopicPartition getTopicPartition(String topic, Integer partition) { + return new TopicPartition(topic, partition); + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java new file mode 100644 index 0000000000000..417bbe711681a --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.tools; + +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.annotation.ClusterTestDefaults; +import kafka.test.annotation.Type; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Exit; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.Properties; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@ExtendWith(value = ClusterTestExtensions.class) +@ClusterTestDefaults(clusterType = Type.ZK) +@Tag("integration") +public class GetOffsetShellTest { + private final int topicCount = 4; + private final int offsetTopicPartitionCount = 4; + private final ClusterInstance cluster; + + public GetOffsetShellTest(ClusterInstance cluster) { + this.cluster = cluster; + } + + private String getTopicName(int i) { + return "topic" + i; + } + + public void setUp() { + cluster.config().serverProperties().put("auto.create.topics.enable", false); + cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); + cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); + + try (Admin admin = Admin.create(cluster.config().adminClientProperties())) { + List topics = new ArrayList<>(); + + IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1))); + + admin.createTopics(topics); + } + + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers")); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + try (KafkaProducer producer = new KafkaProducer<>(props)) { + IntStream.range(0, topicCount + 1) + .forEach(i -> IntStream.range(0, i * i) + .forEach(msgCount -> producer.send( + new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount))) + ); + } + } + + static class Row { + private String name; + private int partition; + private Long timestamp; + + public Row(String name, int partition, Long timestamp) { + this.name = name; + this.partition = partition; + this.timestamp = timestamp; + } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + + if (!(o instanceof Row)) return false; + + Row r = (Row) o; + + return name.equals(r.name) && partition == r.partition && Objects.equals(timestamp, r.timestamp); + } + + @Override + public int hashCode() { + return Objects.hash(name, partition, timestamp); + } + } + + @ClusterTest + public void testNoFilterOptions() { + setUp(); + + List output = executeAndParse(); + + assertEquals(expectedOffsetsWithInternal(), output); + } + + @ClusterTest + public void testInternalExcluded() { + setUp(); + + List output = executeAndParse("--exclude-internal-topics"); + + assertEquals(expectedTestTopicOffsets(), output); + } + + @ClusterTest + public void testTopicNameArg() { + setUp(); + + IntStream.range(1, topicCount + 1).forEach(i -> { + List offsets = executeAndParse("--topic", getTopicName(i)); + + assertEquals(expectedOffsetsForTopic(i), offsets, () -> "Offset output did not match for " + getTopicName(i)); + }); + } + + @ClusterTest + public void testTopicPatternArg() { + setUp(); + + List offsets = executeAndParse("--topic", "topic.*"); + + assertEquals(expectedTestTopicOffsets(), offsets); + } + + @ClusterTest + public void testPartitionsArg() { + setUp(); + + List offsets = executeAndParse("--partitions", "0,1"); + + assertEquals(expectedOffsetsWithInternal().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets); + } + + @ClusterTest + public void testTopicPatternArgWithPartitionsArg() { + setUp(); + + List offsets = executeAndParse("--topic", "topic.*", "--partitions", "0,1"); + + assertEquals(expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets); + } + + @ClusterTest + public void testTopicPartitionsArg() { + setUp(); + + List offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3"); + List expected = Arrays.asList( + new Row("__consumer_offsets", 3, 0L), + new Row("topic1", 0, 1L), + new Row("topic2", 1, 2L), + new Row("topic3", 2, 3L), + new Row("topic4", 2, 4L) + ); + + assertEquals(expected, offsets); + } + + @ClusterTest + public void testGetLatestOffsets() { + setUp(); + + for (String time : new String[] {"-1", "latest"}) { + List offsets = executeAndParse("--topic-partitions", "topic.*:0", "--time", time); + List expected = Arrays.asList( + new Row("topic1", 0, 1L), + new Row("topic2", 0, 2L), + new Row("topic3", 0, 3L), + new Row("topic4", 0, 4L) + ); + + assertEquals(expected, offsets); + } + } + + @ClusterTest + public void testGetEarliestOffsets() { + setUp(); + + for (String time : new String[] {"-2", "earliest"}) { + List offsets = executeAndParse("--topic-partitions", "topic.*:0", "--time", time); + List expected = Arrays.asList( + new Row("topic1", 0, 0L), + new Row("topic2", 0, 0L), + new Row("topic3", 0, 0L), + new Row("topic4", 0, 0L) + ); + + assertEquals(expected, offsets); + } + } + + @ClusterTest + public void testGetOffsetsByMaxTimestamp() { + setUp(); + + for (String time : new String[] {"-3", "max-timestamp"}) { + List offsets = executeAndParse("--topic-partitions", "topic.*", "--time", time); + + offsets.forEach( + row -> assertTrue(row.timestamp >= 0 && row.timestamp <= Integer.parseInt(row.name.replace("topic", ""))) + ); + } + } + + @ClusterTest + public void testGetOffsetsByTimestamp() { + setUp(); + + String time = String.valueOf(System.currentTimeMillis() / 2); + + List offsets = executeAndParse("--topic-partitions", "topic.*:0", "--time", time); + List expected = Arrays.asList( + new Row("topic1", 0, 0L), + new Row("topic2", 0, 0L), + new Row("topic3", 0, 0L), + new Row("topic4", 0, 0L) + ); + + assertEquals(expected, offsets); + } + + @ClusterTest + public void testNoOffsetIfTimestampGreaterThanLatestRecord() { + setUp(); + + String time = String.valueOf(System.currentTimeMillis() * 2); + + List offsets = executeAndParse("--topic-partitions", "topic.*", "--time", time); + + assertEquals(new ArrayList(), offsets); + } + + @ClusterTest + public void testTopicPartitionsArgWithInternalExcluded() { + setUp(); + + List offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics"); + List expected = Arrays.asList( + new Row("topic1", 0, 1L), + new Row("topic2", 1, 2L), + new Row("topic3", 2, 3L), + new Row("topic4", 2, 4L) + ); + + assertEquals(expected, offsets); + } + + @ClusterTest + public void testTopicPartitionsArgWithInternalIncluded() { + setUp(); + + List offsets = executeAndParse("--topic-partitions", "__.*:0"); + + assertEquals(Arrays.asList(new Row("__consumer_offsets", 0, 0L)), offsets); + } + + @ClusterTest + public void testTopicPartitionsNotFoundForNonExistentTopic() { + assertExitCodeIsOne("--topic", "some_nonexistent_topic"); + } + + @ClusterTest + public void testTopicPartitionsNotFoundForExcludedInternalTopic() { + assertExitCodeIsOne("--topic", "some_nonexistent_topic:*"); + } + + @ClusterTest + public void testTopicPartitionsNotFoundForNonMatchingTopicPartitionPattern() { + assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--exclude-internal-topics"); + } + + @ClusterTest + public void testTopicPartitionsFlagWithTopicFlagCauseExit() { + assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--topic", "topic1"); + } + + @ClusterTest + public void testTopicPartitionsFlagWithPartitionsFlagCauseExit() { + assertExitCodeIsOne("--topic-partitions", "__consumer_offsets", "--partitions", "0"); + } + + private void assertExitCodeIsOne(String... args) { + final int[] exitStatus = new int[1]; + + Exit.setExitProcedure((statusCode, message) -> { + exitStatus[0] = statusCode; + + throw new RuntimeException(); + }); + + try { + GetOffsetShell.main(addBootstrapServer(args)); + } catch (RuntimeException ignored) { + + } finally { + Exit.resetExitProcedure(); + } + + assertEquals(1, exitStatus[0]); + } + + private List expectedOffsetsWithInternal() { + List consOffsets = IntStream.range(0, offsetTopicPartitionCount + 1) + .mapToObj(i -> new Row("__consumer_offsets", i, 0L)) + .collect(Collectors.toList()); + + return Stream.concat(consOffsets.stream(), expectedTestTopicOffsets().stream()).collect(Collectors.toList()); + } + + private List expectedTestTopicOffsets() { + List offsets = new ArrayList<>(topicCount + 1); + + for (int i = 0; i < topicCount + 1; i++) { + offsets.addAll(expectedOffsetsForTopic(i)); + } + + return offsets; + } + + private List expectedOffsetsForTopic(int i) { + String name = getTopicName(i); + + return IntStream.range(0, i).mapToObj(p -> new Row(name, p, (long) i)).collect(Collectors.toList()); + } + + private List executeAndParse(String... args) { + String out = ToolsTestUtils.captureStandardOut(() -> GetOffsetShell.mainNoExit(addBootstrapServer(args))); + + return Arrays.stream(out.split(System.lineSeparator())) + .map(i -> i.split(":")) + .filter(i -> i.length >= 2) + .map(line -> new Row(line[0], Integer.parseInt(line[1]), (line.length == 2 || line[2].isEmpty()) ? null : Long.parseLong(line[2]))) + .collect(Collectors.toList()); + } + + private String[] addBootstrapServer(String... args) { + ArrayList newArgs = new ArrayList<>(Arrays.asList(args)); + newArgs.add("--bootstrap-server"); + newArgs.add(cluster.bootstrapServers()); + + return newArgs.toArray(new String[0]); + } +}