From 2afe350db74b9d893ecdd5b7b6b9f1f6510337a3 Mon Sep 17 00:00:00 2001 From: birdstorm Date: Mon, 9 Apr 2018 17:02:41 +0800 Subject: [PATCH] TISPARK-21 delete redundant dag column that affects index single read (#304) (#307) --- .../org/apache/spark/sql/TiStrategy.scala | 3 -- .../apache/spark/sql/BaseTiSparkSuite.scala | 2 +- .../org/apache/spark/sql/IssueTestSuite.scala | 53 ++++++++++--------- 3 files changed, 29 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala b/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala index 702bb85f10..0a32b9501f 100644 --- a/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala +++ b/core/src/main/scala/org/apache/spark/sql/TiStrategy.scala @@ -111,9 +111,6 @@ class TiStrategy(context: SQLContext) extends Strategy with Logging { ): SparkPlan = { val table = source.table dagRequest.setTableInfo(table) - if (dagRequest.getFields.isEmpty) { - dagRequest.addRequiredColumn(ColumnRef.create(table.getColumns.get(0).getName)) - } // Need to resolve column info after add aggregation push downs dagRequest.resolve() diff --git a/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala b/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala index 824c403ca3..8b7274aa49 100644 --- a/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/BaseTiSparkSuite.scala @@ -120,7 +120,7 @@ class BaseTiSparkSuite extends QueryTest with SharedSQLContext { def execDBTSAndJudge(str: String, skipped: Boolean = false): Boolean = try { - compResult(querySpark(str), queryTiDB(str)) + compResult(querySpark(str), queryTiDB(str), str.contains(" order by ")) } catch { case e: Throwable => fail(e) } diff --git a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala index ec9663af16..8d3423882e 100644 --- a/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/IssueTestSuite.scala @@ -19,6 +19,33 @@ import org.apache.spark.sql.functions.{col, sum} class IssueTestSuite extends BaseTiSparkSuite { + test("TISPARK-21 count(1) when single read results in DAGRequestException") { + tidbStmt.execute("DROP TABLE IF EXISTS `single_read`") + tidbStmt.execute( + """CREATE TABLE `single_read` ( + | `c1` int(11) NOT NULL, + | `c2` int(11) NOT NULL, + | `c3` int(11) NOT NULL, + | `c4` int(11) NOT NULL, + | `c5` int(11) DEFAULT NULL, + | PRIMARY KEY (`c3`,`c2`), + | KEY `c4` (`c4`) + |) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin""".stripMargin + ) + tidbStmt.execute( + "insert into single_read values(1, 1, 1, 2, null), (1, 2, 1, 1, null), (2, 1, 3, 2, null), (2, 2, 2, 1, 0)" + ) + refreshConnections() + + assert(execDBTSAndJudge("select count(1) from single_read")) + assert(execDBTSAndJudge("select count(c1) from single_read")) + assert(execDBTSAndJudge("select count(c2) from single_read")) + assert(execDBTSAndJudge("select count(c5) from single_read")) + assert(execDBTSAndJudge("select count(1) from single_read where c2 < 2")) + assert(execDBTSAndJudge("select c2, c3 from single_read")) + assert(execDBTSAndJudge("select c3, c4 from single_read")) + } + test("TISPARK-16 fix excessive dag column") { tidbStmt.execute("DROP TABLE IF EXISTS `t1`") tidbStmt.execute("DROP TABLE IF EXISTS `t2`") @@ -57,30 +84,6 @@ class IssueTestSuite extends BaseTiSparkSuite { project_df.show } - // https://github.com/pingcap/tispark/issues/272 - test("Prefix index read does not work correctly") { - tidbStmt.execute("DROP TABLE IF EXISTS `prefix`") - tidbStmt.execute( - "CREATE TABLE `prefix` (\n `a` int(11) NOT NULL,\n `b` varchar(55) DEFAULT NULL,\n `c` int(11) DEFAULT NULL,\n PRIMARY KEY (`a`),\n KEY `prefix_index` (`b`(2)),\n KEY `prefix_complex` (`a`, `b`(2))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin" - ) - tidbStmt.execute( - "INSERT INTO `prefix` VALUES(1, \"bbb\", 3), (2, \"bbc\", 4), (3, \"bbb\", 5), (4, \"abc\", 6), (5, \"abc\", 7), (6, \"abc\", 7)" - ) - tidbStmt.execute("ANALYZE TABLE `prefix`") - refreshConnections() - // add explain to show if we have actually used prefix index in plan - spark.sql("select a, b from prefix where b < \"bbc\"").explain - spark.sql("select a, b from prefix where a = 1 and b = \"bbb\"").explain - spark.sql("select b from prefix where b = \"bbc\"").explain - spark.sql("select b from prefix where b >= \"bbc\" and b < \"bbd\"").explain - spark.sql("select c, b from prefix where b > \"bb\" and b < \"bbc\"").explain - assert(execDBTSAndJudge("select a, b from prefix where b < \"bbc\"")) - assert(execDBTSAndJudge("select a, b from prefix where a = 1 and b = \"bbb\"")) - assert(execDBTSAndJudge("select b from prefix where b = \"bbc\"")) - assert(execDBTSAndJudge("select b from prefix where b >= \"bbc\" and b < \"bbd\"")) - assert(execDBTSAndJudge("select c, b from prefix where b = \"bb\" and b < \"bbc\"")) - } - // https://github.com/pingcap/tispark/issues/262 test("NPE when decoding datetime,date,timestamp") { tidbStmt.execute("DROP TABLE IF EXISTS `tmp_debug`") @@ -165,9 +168,9 @@ class IssueTestSuite extends BaseTiSparkSuite { try { tidbStmt.execute("drop table if exists t") tidbStmt.execute("drop table if exists tmp_debug") - tidbStmt.execute("drop table if exists prefix") tidbStmt.execute("drop table if exists t1") tidbStmt.execute("drop table if exists t2") + tidbStmt.execute("drop table if exists single_read") } finally { super.afterAll() }