From 657ad738bcac9d81c3e2939ff10589bb7d9ab1db Mon Sep 17 00:00:00 2001 From: Zijie Lu Date: Thu, 5 Nov 2020 14:08:01 +0800 Subject: [PATCH 1/3] ddl: defining the placement rule of a leader requires `REPLICAS` option (#20813) --- ddl/ddl_api.go | 6 ++++ ddl/placement_sql_test.go | 71 ++++++++++++++++++++++----------------- 2 files changed, 46 insertions(+), 31 deletions(-) diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 6f515d0a9c9c7..3917511e951e5 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -5846,6 +5846,12 @@ func buildPlacementSpecs(bundle *placement.Bundle, specs []*ast.PlacementSpec) ( case ast.PlacementRoleFollower: role = placement.Follower case ast.PlacementRoleLeader: + if spec.Replicas == 0 { + spec.Replicas = 1 + } + if spec.Replicas > 1 { + err = errors.Errorf("replicas can only be 1 when the role is leader") + } role = placement.Leader case ast.PlacementRoleLearner: role = placement.Learner diff --git a/ddl/placement_sql_test.go b/ddl/placement_sql_test.go index fba9b6962fbd2..3d1e0503585a6 100644 --- a/ddl/placement_sql_test.go +++ b/ddl/placement_sql_test.go @@ -41,42 +41,42 @@ PARTITION BY RANGE (c) ( _, err := tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+zone=sh"]' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+ zone = sh ", "- zone = bj "]' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='{"+ zone = sh ": 1}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='{"+ zone = sh, -zone = bj ": 1}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='{"+ zone = sh ": 1, "- zone = bj": 2}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 alter placement policy constraints='{"+ zone = sh, -zone = bj ": 1}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) @@ -89,52 +89,52 @@ drop placement policy _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+ zone = sh "]' - role=leader + role=follower replicas=3, add placement policy constraints='{"+ zone = sh, -zone = bj ": 1}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+ zone = sh "]' - role=leader + role=follower replicas=3, add placement policy constraints='{"+zone=sh,+zone=bj":1,"+zone=sh,+zone=bj":1}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='{"+ zone = sh ": 1, "- zone = bj,+zone=sh": 2}' - role=leader + role=follower replicas=3, alter placement policy constraints='{"+ zone = sh, -zone = bj ": 1}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+zone=sh", "-zone=bj"]' - role=leader + role=follower replicas=3, add placement policy constraints='{"+zone=sh": 1}' - role=leader + role=follower replicas=3, add placement policy constraints='{"+zone=sh,+zone=bj":1,"+zone=sh,+zone=bj":1}' - role=leader + role=follower replicas=3, alter placement policy constraints='{"+zone=sh": 1, "-zon =bj,+zone=sh": 1}' - role=leader + role=follower replicas=3`) c.Assert(err, IsNil) @@ -158,28 +158,28 @@ drop placement policy _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints=',,,' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*array or object.*") _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='[,,,' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*invalid character.*") _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='{,,,' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*invalid character.*") _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='{"+ zone = sh ": 1, "- zone = bj": 2}' - role=leader + role=follower replicas=2`) c.Assert(err, ErrorMatches, ".*should be larger or equal to the number of total replicas.*") @@ -187,14 +187,14 @@ add placement policy _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='[",,,"]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+ "]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") @@ -202,7 +202,7 @@ add placement policy _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["0000"]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") @@ -210,7 +210,7 @@ add placement policy _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+000"]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") @@ -218,14 +218,14 @@ add placement policy _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+ =zone1"]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+ = z"]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") @@ -233,28 +233,28 @@ add placement policy _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+zone="]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='["+z = "]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*label constraint should be in format.*") _, err = tk.Exec(`alter table t1 alter partition p add placement policy constraints='["+zone=sh"]' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*Unknown partition.*") _, err = tk.Exec(`alter table t1 alter partition p0 add placement policy constraints='{"+ zone = sh, -zone = bj ": -1}' - role=leader + role=follower replicas=3`) c.Assert(err, ErrorMatches, ".*count should be positive.*") @@ -271,9 +271,18 @@ add placement policy _, err = tk.Exec(`alter table t1 alter partition p add placement policy constraints='["+zone=sh"]' - role=leader + role=follower replicas=3`) c.Assert(ddl.ErrPartitionMgmtOnNonpartitioned.Equal(err), IsTrue) + + // issue 20751 + tk.MustExec("drop table if exists t_part_pk_id") + tk.MustExec("create TABLE t_part_pk_id (c1 int,c2 int) partition by range(c1 div c2 ) (partition p0 values less than (2))") + _, err = tk.Exec(`alter table t_part_pk_id alter partition p0 add placement policy constraints='["+host=store1"]' role=leader;`) + c.Assert(err, IsNil) + _, err = tk.Exec(`alter table t_part_pk_id alter partition p0 add placement policy constraints='["+host=store1"]' role=leader replicas=3;`) + c.Assert(err, ErrorMatches, ".*replicas can only be 1 when the role is leader") + tk.MustExec("drop table t_part_pk_id") } func (s *testDBSuite1) TestPlacementPolicyCache(c *C) { From 4e1cbca5c2a3240d3549ee254f814bd567e8f6e0 Mon Sep 17 00:00:00 2001 From: "Zhuomin(Charming) Liu" Date: Thu, 5 Nov 2020 14:21:35 +0800 Subject: [PATCH 2/3] executor: fix panic when set partition handle for index join (#20551) --- executor/builder.go | 10 ++++++---- executor/index_lookup_merge_join_test.go | 12 ++++++++++++ 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 97fba24aa907d..9b514f50d8384 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -3347,10 +3347,12 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex }) } var b distsql.RequestBuilder - if _, ok := handles[0].(kv.PartitionHandle); ok { - b.SetPartitionsAndHandles(handles) - } else { - b.SetTableHandles(getPhysicalTableID(e.table), handles) + if len(handles) > 0 { + if _, ok := handles[0].(kv.PartitionHandle); ok { + b.SetPartitionsAndHandles(handles) + } else { + b.SetTableHandles(getPhysicalTableID(e.table), handles) + } } return builder.buildTableReaderBase(ctx, e, b) } diff --git a/executor/index_lookup_merge_join_test.go b/executor/index_lookup_merge_join_test.go index c0bb253cec963..8f6bb8e1f4a17 100644 --- a/executor/index_lookup_merge_join_test.go +++ b/executor/index_lookup_merge_join_test.go @@ -146,3 +146,15 @@ func (s *testSuite9) TestIssue20400(c *C) { tk.MustQuery("select /*+ inl_merge_join(t,s)*/ * from t left join s on t.a=s.a and t.a>1").Check( testkit.Rows("1 ")) } + +func (s *testSuite9) TestIssue20549(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t1, t2") + tk.MustExec("CREATE TABLE `t1` (`id` bigint(20) NOT NULL AUTO_INCREMENT, `t2id` bigint(20) DEFAULT NULL, PRIMARY KEY (`id`), KEY `t2id` (`t2id`));") + tk.MustExec("INSERT INTO `t1` VALUES (1,NULL);") + tk.MustExec("CREATE TABLE `t2` (`id` bigint(20) NOT NULL AUTO_INCREMENT, PRIMARY KEY (`id`));") + tk.MustQuery("SELECT /*+ INL_MERGE_JOIN(t1,t2) */ 1 from t1 left outer join t2 on t1.t2id=t2.id;").Check( + testkit.Rows("1")) + tk.MustQuery("SELECT /*+ HASH_JOIN(t1,t2) */ 1 from t1 left outer join t2 on t1.t2id=t2.id;\n").Check( + testkit.Rows("1")) +} From 8c418b9773504ebb62db7358447ce77970c15e66 Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Thu, 5 Nov 2020 14:35:31 +0800 Subject: [PATCH 3/3] store/tikv: Fix goroutine leak in tikv client (#20808) Signed-off-by: MyonKeminta --- store/tikv/kv.go | 4 ++++ store/tikv/safepoint.go | 11 +++++++++++ 2 files changed, 15 insertions(+) diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 4072ab92c9183..4ea21cfd0227c 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -367,6 +367,10 @@ func (s *tikvStore) Close() error { if s.coprCache != nil { s.coprCache.cache.Close() } + + if err := s.kv.Close(); err != nil { + return errors.Trace(err) + } return nil } diff --git a/store/tikv/safepoint.go b/store/tikv/safepoint.go index c6b119fd2af56..0e3d8de44495b 100644 --- a/store/tikv/safepoint.go +++ b/store/tikv/safepoint.go @@ -46,6 +46,7 @@ type SafePointKV interface { Put(k string, v string) error Get(k string) (string, error) GetWithPrefix(k string) ([]*mvccpb.KeyValue, error) + Close() error } // MockSafePointKV implements SafePointKV at mock test @@ -90,6 +91,11 @@ func (w *MockSafePointKV) GetWithPrefix(prefix string) ([]*mvccpb.KeyValue, erro return kvs, nil } +// Close implements the Close method for SafePointKV +func (w *MockSafePointKV) Close() error { + return nil +} + // EtcdSafePointKV implements SafePointKV at runtime type EtcdSafePointKV struct { cli *clientv3.Client @@ -140,6 +146,11 @@ func (w *EtcdSafePointKV) GetWithPrefix(k string) ([]*mvccpb.KeyValue, error) { return resp.Kvs, nil } +// Close implements the Close for SafePointKV +func (w *EtcdSafePointKV) Close() error { + return errors.Trace(w.cli.Close()) +} + func saveSafePoint(kv SafePointKV, t uint64) error { s := strconv.FormatUint(t, 10) err := kv.Put(GcSavedSafePoint, s)