diff --git a/ddl/ddl_tiflash_api.go b/ddl/ddl_tiflash_api.go new file mode 100644 index 0000000000000..1ade909b93ee9 --- /dev/null +++ b/ddl/ddl_tiflash_api.go @@ -0,0 +1,603 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2013 The ql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSES/QL-LICENSE file. + +package ddl + +import ( + "bytes" + "container/list" + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/helper" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + atomicutil "go.uber.org/atomic" + "go.uber.org/zap" +) + +// TiFlashReplicaStatus records status for each TiFlash replica. +type TiFlashReplicaStatus struct { + ID int64 + Count uint64 + LocationLabels []string + Available bool + LogicalTableAvailable bool + HighPriority bool + IsPartition bool +} + +// TiFlashTick is type for backoff threshold. +type TiFlashTick float64 + +// PollTiFlashBackoffElement records backoff for each TiFlash Table. +// `Counter` increases every `Tick`, if it reached `Threshold`, it will be reset to 0 while `Threshold` grows. +// `TotalCounter` records total `Tick`s this element has since created. +type PollTiFlashBackoffElement struct { + Counter int + Threshold TiFlashTick + TotalCounter int +} + +// NewPollTiFlashBackoffElement initialize backoff element for a TiFlash table. +func NewPollTiFlashBackoffElement() *PollTiFlashBackoffElement { + return &PollTiFlashBackoffElement{ + Counter: 0, + Threshold: PollTiFlashBackoffMinTick, + TotalCounter: 0, + } +} + +// PollTiFlashBackoffContext is a collection of all backoff states. +type PollTiFlashBackoffContext struct { + MinThreshold TiFlashTick + MaxThreshold TiFlashTick + // Capacity limits tables a backoff pool can handle, in order to limit handling of big tables. + Capacity int + Rate TiFlashTick + elements map[int64]*PollTiFlashBackoffElement +} + +// NewPollTiFlashBackoffContext creates an instance of PollTiFlashBackoffContext. +func NewPollTiFlashBackoffContext(MinThreshold, MaxThreshold TiFlashTick, Capacity int, Rate TiFlashTick) (*PollTiFlashBackoffContext, error) { + if MaxThreshold < MinThreshold { + return nil, fmt.Errorf("`MaxThreshold` should always be larger than `MinThreshold`") + } + if MinThreshold < 1 { + return nil, fmt.Errorf("`MinThreshold` should not be less than 1") + } + if Capacity < 0 { + return nil, fmt.Errorf("negative `Capacity`") + } + if Rate <= 1 { + return nil, fmt.Errorf("`Rate` should always be larger than 1") + } + return &PollTiFlashBackoffContext{ + MinThreshold: MinThreshold, + MaxThreshold: MaxThreshold, + Capacity: Capacity, + elements: make(map[int64]*PollTiFlashBackoffElement), + Rate: Rate, + }, nil +} + +// TiFlashManagementContext is the context for TiFlash Replica Management +type TiFlashManagementContext struct { + TiFlashStores map[int64]helper.StoreStat + PollCounter uint64 + Backoff *PollTiFlashBackoffContext + // tables waiting for updating progress after become available. + UpdatingProgressTables *list.List +} + +// AvailableTableID is the table id info of available table for waiting to update TiFlash replica progress. +type AvailableTableID struct { + ID int64 + IsPartition bool +} + +// Tick will first check increase Counter. +// It returns: +// 1. A bool indicates whether threshold is grown during this tick. +// 2. A bool indicates whether this ID exists. +// 3. A int indicates how many ticks ID has counted till now. +func (b *PollTiFlashBackoffContext) Tick(ID int64) (bool, bool, int) { + e, ok := b.Get(ID) + if !ok { + return false, false, 0 + } + grew := e.MaybeGrow(b) + e.Counter++ + e.TotalCounter++ + return grew, true, e.TotalCounter +} + +// NeedGrow returns if we need to grow. +// It is exported for testing. +func (e *PollTiFlashBackoffElement) NeedGrow() bool { + return e.Counter >= int(e.Threshold) +} + +func (e *PollTiFlashBackoffElement) doGrow(b *PollTiFlashBackoffContext) { + if e.Threshold < b.MinThreshold { + e.Threshold = b.MinThreshold + } + if e.Threshold*b.Rate > b.MaxThreshold { + e.Threshold = b.MaxThreshold + } else { + e.Threshold *= b.Rate + } + e.Counter = 0 +} + +// MaybeGrow grows threshold and reset counter when needed. +func (e *PollTiFlashBackoffElement) MaybeGrow(b *PollTiFlashBackoffContext) bool { + if !e.NeedGrow() { + return false + } + e.doGrow(b) + return true +} + +// Remove will reset table from backoff. +func (b *PollTiFlashBackoffContext) Remove(ID int64) bool { + _, ok := b.elements[ID] + delete(b.elements, ID) + return ok +} + +// Get returns pointer to inner PollTiFlashBackoffElement. +// Only exported for test. +func (b *PollTiFlashBackoffContext) Get(ID int64) (*PollTiFlashBackoffElement, bool) { + res, ok := b.elements[ID] + return res, ok +} + +// Put will record table into backoff pool, if there is enough room, or returns false. +func (b *PollTiFlashBackoffContext) Put(ID int64) bool { + _, ok := b.elements[ID] + if ok { + return true + } else if b.Len() < b.Capacity { + b.elements[ID] = NewPollTiFlashBackoffElement() + return true + } + return false +} + +// Len gets size of PollTiFlashBackoffContext. +func (b *PollTiFlashBackoffContext) Len() int { + return len(b.elements) +} + +// NewTiFlashManagementContext creates an instance for TiFlashManagementContext. +func NewTiFlashManagementContext() (*TiFlashManagementContext, error) { + c, err := NewPollTiFlashBackoffContext(PollTiFlashBackoffMinTick, PollTiFlashBackoffMaxTick, PollTiFlashBackoffCapacity, PollTiFlashBackoffRate) + if err != nil { + return nil, err + } + return &TiFlashManagementContext{ + PollCounter: 0, + TiFlashStores: make(map[int64]helper.StoreStat), + Backoff: c, + UpdatingProgressTables: list.New(), + }, nil +} + +var ( + // PollTiFlashInterval is the interval between every pollTiFlashReplicaStatus call. + PollTiFlashInterval = 2 * time.Second + // PullTiFlashPdTick indicates the number of intervals before we fully sync all TiFlash pd rules and tables. + PullTiFlashPdTick = atomicutil.NewUint64(30 * 5) + // UpdateTiFlashStoreTick indicates the number of intervals before we fully update TiFlash stores. + UpdateTiFlashStoreTick = atomicutil.NewUint64(5) + // PollTiFlashBackoffMaxTick is the max tick before we try to update TiFlash replica availability for one table. + PollTiFlashBackoffMaxTick TiFlashTick = 10 + // PollTiFlashBackoffMinTick is the min tick before we try to update TiFlash replica availability for one table. + PollTiFlashBackoffMinTick TiFlashTick = 1 + // PollTiFlashBackoffCapacity is the cache size of backoff struct. + PollTiFlashBackoffCapacity int = 1000 + // PollTiFlashBackoffRate is growth rate of exponential backoff threshold. + PollTiFlashBackoffRate TiFlashTick = 1.5 + // RefreshProgressMaxTableCount is the max count of table to refresh progress after available each poll. + RefreshProgressMaxTableCount uint64 = 1000 +) + +func getTiflashHTTPAddr(host string, statusAddr string) (string, error) { + configURL := fmt.Sprintf("%s://%s/config", + util.InternalHTTPSchema(), + statusAddr, + ) + resp, err := util.InternalHTTPClient().Get(configURL) + if err != nil { + return "", errors.Trace(err) + } + defer func() { + resp.Body.Close() + }() + + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(resp.Body) + if err != nil { + return "", errors.Trace(err) + } + + var j map[string]interface{} + err = json.Unmarshal(buf.Bytes(), &j) + if err != nil { + return "", errors.Trace(err) + } + + engineStore, ok := j["engine-store"].(map[string]interface{}) + if !ok { + return "", errors.New("Error json") + } + port64, ok := engineStore["http_port"].(float64) + if !ok { + return "", errors.New("Error json") + } + port := int(port64) + + addr := fmt.Sprintf("%v:%v", host, port) + return addr, nil +} + +// LoadTiFlashReplicaInfo parses model.TableInfo into []TiFlashReplicaStatus. +func LoadTiFlashReplicaInfo(tblInfo *model.TableInfo, tableList *[]TiFlashReplicaStatus) { + if tblInfo.TiFlashReplica == nil { + // reject tables that has no tiflash replica such like `INFORMATION_SCHEMA` + return + } + if pi := tblInfo.GetPartitionInfo(); pi != nil { + for _, p := range pi.Definitions { + logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition %v\n", tblInfo.ID, p.ID)) + *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, + tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, false, true}) + } + // partitions that in adding mid-state + for _, p := range pi.AddingDefinitions { + logutil.BgLogger().Debug(fmt.Sprintf("Table %v has partition adding %v\n", tblInfo.ID, p.ID)) + *tableList = append(*tableList, TiFlashReplicaStatus{p.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID), tblInfo.TiFlashReplica.Available, true, true}) + } + } else { + logutil.BgLogger().Debug(fmt.Sprintf("Table %v has no partition\n", tblInfo.ID)) + *tableList = append(*tableList, TiFlashReplicaStatus{tblInfo.ID, tblInfo.TiFlashReplica.Count, tblInfo.TiFlashReplica.LocationLabels, tblInfo.TiFlashReplica.Available, tblInfo.TiFlashReplica.Available, false, false}) + } +} + +// UpdateTiFlashHTTPAddress report TiFlash's StatusAddress's port to Pd's etcd. +func (d *ddl) UpdateTiFlashHTTPAddress(store *helper.StoreStat) error { + addrAndPort := strings.Split(store.Store.StatusAddress, ":") + if len(addrAndPort) < 2 { + return errors.New("Can't get TiFlash Address from PD") + } + httpAddr, err := getTiflashHTTPAddr(addrAndPort[0], store.Store.StatusAddress) + if err != nil { + return errors.Trace(err) + } + // Report to pd + key := fmt.Sprintf("/tiflash/cluster/http_port/%v", store.Store.Address) + if d.etcdCli == nil { + return errors.New("no etcdCli in ddl") + } + origin := "" + resp, err := d.etcdCli.Get(d.ctx, key) + if err != nil { + return errors.Trace(err) + } + // Try to update. + for _, kv := range resp.Kvs { + if string(kv.Key) == key { + origin = string(kv.Value) + break + } + } + if origin != httpAddr { + logutil.BgLogger().Warn(fmt.Sprintf("Update status addr of %v from %v to %v", key, origin, httpAddr)) + err := ddlutil.PutKVToEtcd(d.ctx, d.etcdCli, 1, key, httpAddr) + if err != nil { + return errors.Trace(err) + } + } + + return nil +} + +func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error { + // We need the up-to-date information about TiFlash stores. + // Since TiFlash Replica synchronize may happen immediately after new TiFlash stores are added. + tikvStats, err := infosync.GetTiFlashStoresStat(context.Background()) + // If MockTiFlash is not set, will issue a MockTiFlashError here. + if err != nil { + return err + } + pollTiFlashContext.TiFlashStores = make(map[int64]helper.StoreStat) + for _, store := range tikvStats.Stores { + for _, l := range store.Store.Labels { + if l.Key == "engine" && l.Value == "tiflash" { + pollTiFlashContext.TiFlashStores[store.Store.ID] = store + logutil.BgLogger().Debug("Found tiflash store", zap.Int64("id", store.Store.ID), zap.String("Address", store.Store.Address), zap.String("StatusAddress", store.Store.StatusAddress)) + } + } + } + logutil.BgLogger().Debug("updateTiFlashStores finished", zap.Int("TiFlash store count", len(pollTiFlashContext.TiFlashStores))) + return nil +} + +func pollAvailableTableProgress(schemas infoschema.InfoSchema, ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) { + pollMaxCount := RefreshProgressMaxTableCount + failpoint.Inject("PollAvailableTableProgressMaxCount", func(val failpoint.Value) { + pollMaxCount = uint64(val.(int)) + }) + for element := pollTiFlashContext.UpdatingProgressTables.Front(); element != nil && pollMaxCount > 0; pollMaxCount-- { + availableTableID := element.Value.(AvailableTableID) + var table table.Table + if availableTableID.IsPartition { + table, _, _ = schemas.FindTableByPartitionID(availableTableID.ID) + if table == nil { + logutil.BgLogger().Info("get table by partition failed, may be dropped or truncated", + zap.Int64("partitionID", availableTableID.ID), + ) + pollTiFlashContext.UpdatingProgressTables.Remove(element) + element = element.Next() + continue + } + } else { + var ok bool + table, ok = schemas.TableByID(availableTableID.ID) + if !ok { + logutil.BgLogger().Info("get table id failed, may be dropped or truncated", + zap.Int64("tableID", availableTableID.ID), + ) + pollTiFlashContext.UpdatingProgressTables.Remove(element) + element = element.Next() + continue + } + } + + tableInfo := table.Meta() + if tableInfo.TiFlashReplica == nil { + logutil.BgLogger().Info("table has no TiFlash replica", + zap.Int64("tableID or partitionID", availableTableID.ID), + zap.Bool("IsPartition", availableTableID.IsPartition), + ) + pollTiFlashContext.UpdatingProgressTables.Remove(element) + element = element.Next() + continue + } + progress, err := infosync.CalculateTiFlashProgress(availableTableID.ID, tableInfo.TiFlashReplica.Count, pollTiFlashContext.TiFlashStores) + if err != nil { + logutil.BgLogger().Error("get tiflash sync progress failed", + zap.Error(err), + zap.Int64("tableID", availableTableID.ID), + zap.Bool("IsPartition", availableTableID.IsPartition), + ) + continue + } + err = infosync.UpdateTiFlashProgressCache(availableTableID.ID, progress) + if err != nil { + logutil.BgLogger().Error("update tiflash sync progress cache failed", + zap.Error(err), + zap.Int64("tableID", availableTableID.ID), + zap.Bool("IsPartition", availableTableID.IsPartition), + zap.Float64("progress", progress), + ) + continue + } + next := element.Next() + pollTiFlashContext.UpdatingProgressTables.Remove(element) + element = next + } +} + +func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *TiFlashManagementContext) error { + if pollTiFlashContext.PollCounter%UpdateTiFlashStoreTick.Load() == 0 { + if err := updateTiFlashStores(pollTiFlashContext); err != nil { + // If we failed to get from pd, retry everytime. + pollTiFlashContext.PollCounter = 0 + return err + } + } + + failpoint.Inject("OneTiFlashStoreDown", func() { + for storeID, store := range pollTiFlashContext.TiFlashStores { + store.Store.StateName = "Down" + pollTiFlashContext.TiFlashStores[storeID] = store + break + } + }) + pollTiFlashContext.PollCounter++ + + // Start to process every table. + schema := d.GetInfoSchemaWithInterceptor(ctx) + if schema == nil { + return errors.New("Schema is nil") + } + + pollAvailableTableProgress(schema, ctx, pollTiFlashContext) + + var tableList = make([]TiFlashReplicaStatus, 0) + + // Collect TiFlash Replica info, for every table. + for _, db := range schema.AllSchemas() { + tbls := schema.SchemaTables(db.Name) + for _, tbl := range tbls { + tblInfo := tbl.Meta() + LoadTiFlashReplicaInfo(tblInfo, &tableList) + } + } + + failpoint.Inject("waitForAddPartition", func(val failpoint.Value) { + for _, phyTable := range tableList { + is := d.infoCache.GetLatest() + _, ok := is.TableByID(phyTable.ID) + if !ok { + tb, _, _ := is.FindTableByPartitionID(phyTable.ID) + if tb == nil { + logutil.BgLogger().Info("[ddl] waitForAddPartition") + sleepSecond := val.(int) + time.Sleep(time.Duration(sleepSecond) * time.Second) + } + } + } + }) + + needPushPending := false + if pollTiFlashContext.UpdatingProgressTables.Len() == 0 { + needPushPending = true + } + + for _, tb := range tableList { + // For every region in each table, if it has one replica, we reckon it ready. + // These request can be batched as an optimization. + available := tb.Available + failpoint.Inject("PollTiFlashReplicaStatusReplacePrevAvailableValue", func(val failpoint.Value) { + available = val.(bool) + }) + // We only check unavailable tables here, so doesn't include blocked add partition case. + if !available && !tb.LogicalTableAvailable { + enabled, inqueue, _ := pollTiFlashContext.Backoff.Tick(tb.ID) + if inqueue && !enabled { + logutil.BgLogger().Info("Escape checking available status due to backoff", zap.Int64("tableId", tb.ID)) + continue + } + + progress, err := infosync.CalculateTiFlashProgress(tb.ID, tb.Count, pollTiFlashContext.TiFlashStores) + if err != nil { + logutil.BgLogger().Error("get tiflash sync progress failed", + zap.Error(err), + zap.Int64("tableID", tb.ID), + ) + continue + } + + err = infosync.UpdateTiFlashProgressCache(tb.ID, progress) + if err != nil { + logutil.BgLogger().Error("get tiflash sync progress from cache failed", + zap.Error(err), + zap.Int64("tableID", tb.ID), + zap.Bool("IsPartition", tb.IsPartition), + zap.Float64("progress", progress), + ) + continue + } + + avail := progress == 1 + failpoint.Inject("PollTiFlashReplicaStatusReplaceCurAvailableValue", func(val failpoint.Value) { + avail = val.(bool) + }) + + if !avail { + logutil.BgLogger().Info("Tiflash replica is not available", zap.Int64("tableID", tb.ID), zap.Float64("progress", progress)) + pollTiFlashContext.Backoff.Put(tb.ID) + } else { + logutil.BgLogger().Info("Tiflash replica is available", zap.Int64("tableID", tb.ID), zap.Float64("progress", progress)) + pollTiFlashContext.Backoff.Remove(tb.ID) + } + failpoint.Inject("skipUpdateTableReplicaInfoInLoop", func() { + failpoint.Continue() + }) + // Will call `onUpdateFlashReplicaStatus` to update `TiFlashReplica`. + if err := d.UpdateTableReplicaInfo(ctx, tb.ID, avail); err != nil { + if infoschema.ErrTableNotExists.Equal(err) && tb.IsPartition { + // May be due to blocking add partition + logutil.BgLogger().Info("updating TiFlash replica status err, maybe false alarm by blocking add", zap.Error(err), zap.Int64("tableID", tb.ID), zap.Bool("isPartition", tb.IsPartition)) + } else { + logutil.BgLogger().Error("updating TiFlash replica status err", zap.Error(err), zap.Int64("tableID", tb.ID), zap.Bool("isPartition", tb.IsPartition)) + } + } + } else { + if needPushPending { + pollTiFlashContext.UpdatingProgressTables.PushFront(AvailableTableID{tb.ID, tb.IsPartition}) + } + } + } + + return nil +} + +func (d *ddl) PollTiFlashRoutine() { + pollTiflashContext, err := NewTiFlashManagementContext() + if err != nil { + logutil.BgLogger().Fatal("TiFlashManagement init failed", zap.Error(err)) + } + + hasSetTiFlashGroup := false + nextSetTiFlashGroupTime := time.Now() + for { + select { + case <-d.ctx.Done(): + return + case <-time.After(PollTiFlashInterval): + } + if d.IsTiFlashPollEnabled() { + if d.sessPool == nil { + logutil.BgLogger().Error("failed to get sessionPool for refreshTiFlashTicker") + return + } + failpoint.Inject("BeforeRefreshTiFlashTickeLoop", func() { + failpoint.Continue() + }) + + if !hasSetTiFlashGroup && !time.Now().Before(nextSetTiFlashGroupTime) { + // We should set tiflash rule group a higher index than other placement groups to forbid override by them. + // Once `SetTiFlashGroupConfig` succeed, we do not need to invoke it again. If failed, we should retry it util success. + if err = infosync.SetTiFlashGroupConfig(d.ctx); err != nil { + logutil.BgLogger().Warn("SetTiFlashGroupConfig failed", zap.Error(err)) + nextSetTiFlashGroupTime = time.Now().Add(time.Minute) + } else { + hasSetTiFlashGroup = true + } + } + + sctx, err := d.sessPool.get() + if err == nil { + if d.ownerManager.IsOwner() { + err := d.refreshTiFlashTicker(sctx, pollTiflashContext) + if err != nil { + switch err.(type) { + case *infosync.MockTiFlashError: + // If we have not set up MockTiFlash instance, for those tests without TiFlash, just suppress. + default: + logutil.BgLogger().Warn("refreshTiFlashTicker returns error", zap.Error(err)) + } + } + } else { + infosync.CleanTiFlashProgressCache() + } + d.sessPool.put(sctx) + } else { + if sctx != nil { + d.sessPool.put(sctx) + } + logutil.BgLogger().Error("failed to get session for pollTiFlashReplicaStatus", zap.Error(err)) + } + } + } +} diff --git a/ddl/tiflashtest/ddl_tiflash_test.go b/ddl/tiflashtest/ddl_tiflash_test.go new file mode 100644 index 0000000000000..c3ec3a1d2b0fb --- /dev/null +++ b/ddl/tiflashtest/ddl_tiflash_test.go @@ -0,0 +1,1356 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2013 The ql Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSES/QL-LICENSE file. + +package tiflashtest + +import ( + "context" + "fmt" + "math" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/placement" + ddlutil "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/domain/infosync" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/store/gcworker" + "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/testutils" + "go.uber.org/zap" +) + +type tiflashContext struct { + store kv.Storage + dom *domain.Domain + tiflash *infosync.MockTiFlash + cluster *unistore.Cluster +} + +const ( + RoundToBeAvailable = 2 + RoundToBeAvailablePartitionTable = 3 +) + +func createTiFlashContext(t *testing.T) (*tiflashContext, func()) { + s := &tiflashContext{} + var err error + + ddl.PollTiFlashInterval = 1000 * time.Millisecond + ddl.PullTiFlashPdTick.Store(60) + s.tiflash = infosync.NewMockTiFlash() + s.store, err = mockstore.NewMockStore( + mockstore.WithClusterInspector(func(c testutils.Cluster) { + mockCluster := c.(*unistore.Cluster) + _, _, region1 := mockstore.BootstrapWithSingleStore(c) + tiflashIdx := 0 + for tiflashIdx < 2 { + store2 := c.AllocID() + peer2 := c.AllocID() + addr2 := fmt.Sprintf("tiflash%d", tiflashIdx) + s.tiflash.AddStore(store2, addr2) + mockCluster.AddStore(store2, addr2, &metapb.StoreLabel{Key: "engine", Value: "tiflash"}) + mockCluster.AddPeer(region1, store2, peer2) + tiflashIdx++ + } + s.cluster = mockCluster + }), + mockstore.WithStoreType(mockstore.EmbedUnistore), + ) + + require.NoError(t, err) + session.SetSchemaLease(0) + session.DisableStats4Test() + s.dom, err = session.BootstrapSession(s.store) + infosync.SetMockTiFlash(s.tiflash) + require.NoError(t, err) + s.dom.SetStatsUpdating(true) + + tearDown := func() { + s.tiflash.Lock() + s.tiflash.StatusServer.Close() + s.tiflash.Unlock() + s.dom.Close() + require.NoError(t, s.store.Close()) + ddl.PollTiFlashInterval = 2 * time.Second + } + return s, tearDown +} + +func ChangeGCSafePoint(tk *testkit.TestKit, t time.Time, enable string, lifeTime string) { + gcTimeFormat := "20060102-15:04:05 -0700 MST" + lastSafePoint := t.Format(gcTimeFormat) + s := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + s = fmt.Sprintf(s, lastSafePoint) + tk.MustExec(s) + s = `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_enable','%[1]s','') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + s = fmt.Sprintf(s, enable) + tk.MustExec(s) + s = `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_life_time','%[1]s','') + ON DUPLICATE KEY + UPDATE variable_value = '%[1]s'` + s = fmt.Sprintf(s, lifeTime) + tk.MustExec(s) +} + +func (s *tiflashContext) CheckFlashback(tk *testkit.TestKit, t *testing.T) { + // If table is dropped after tikv_gc_safe_point, it can be recovered + ChangeGCSafePoint(tk, time.Now().Add(-time.Hour), "false", "10m0s") + defer func() { + ChangeGCSafePoint(tk, time.Now(), "true", "10m0s") + }() + + fCancel := TempDisableEmulatorGC() + defer fCancel() + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("flashback table ddltiflash") + time.Sleep(ddl.PollTiFlashInterval * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + if tb.Meta().Partition != nil { + for _, e := range tb.Meta().Partition.Definitions { + ruleName := fmt.Sprintf("table-%v-r", e.ID) + _, ok := s.tiflash.GetPlacementRule(ruleName) + require.True(t, ok) + } + } else { + ruleName := fmt.Sprintf("table-%v-r", tb.Meta().ID) + _, ok := s.tiflash.GetPlacementRule(ruleName) + require.True(t, ok) + } +} + +func TempDisableEmulatorGC() func() { + ori := ddlutil.IsEmulatorGCEnable() + f := func() { + if ori { + ddlutil.EmulatorGCEnable() + } else { + ddlutil.EmulatorGCDisable() + } + } + ddlutil.EmulatorGCDisable() + return f +} + +func (s *tiflashContext) SetPdLoop(tick uint64) func() { + originValue := ddl.PullTiFlashPdTick.Swap(tick) + return func() { + ddl.PullTiFlashPdTick.Store(originValue) + } +} + +// Run all kinds of DDLs, and will create no redundant pd rules for TiFlash. +func TestTiFlashNoRedundantPDRules(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + + rpcClient, pdClient, cluster, err := unistore.New("") + require.NoError(t, err) + defer func() { + rpcClient.Close() + pdClient.Close() + cluster.Close() + }() + for _, store := range s.cluster.GetAllStores() { + cluster.AddStore(store.Id, store.Address, store.Labels...) + } + gcWorker, err := gcworker.NewMockGCWorker(s.store) + require.NoError(t, err) + tk := testkit.NewTestKit(t, s.store) + fCancel := TempDisableEmulatorGC() + defer fCancel() + // Disable emulator GC, otherwise delete range will be automatically called. + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed")) + }() + + fCancelPD := s.SetPdLoop(10000) + defer fCancelPD() + + // Clean all rules + s.tiflash.CleanPlacementRules() + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("drop table if exists ddltiflashp") + tk.MustExec("create table ddltiflash(z int)") + tk.MustExec("create table ddltiflashp(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10),PARTITION p1 VALUES LESS THAN (20), PARTITION p2 VALUES LESS THAN (30))") + + total := 0 + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + + tk.MustExec("alter table ddltiflash set tiflash replica 1") + total += 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + + tk.MustExec("alter table ddltiflashp set tiflash replica 1") + total += 3 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + + lessThan := 40 + tk.MustExec(fmt.Sprintf("ALTER TABLE ddltiflashp ADD PARTITION (PARTITION pn VALUES LESS THAN (%v))", lessThan)) + total += 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + + tk.MustExec("alter table ddltiflashp truncate partition p1") + total += 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + // Now gc will trigger, and will remove dropped partition. + require.NoError(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + total -= 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + + tk.MustExec("alter table ddltiflashp drop partition p2") + require.NoError(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + total -= 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + + tk.MustExec("truncate table ddltiflash") + total += 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + require.NoError(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + total -= 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) + + tk.MustExec("drop table ddltiflash") + total -= 1 + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + require.NoError(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + require.Equal(t, total, s.tiflash.PlacementRulesLen()) +} + +func TestTiFlashReplicaPartitionTableNormal(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10),PARTITION p1 VALUES LESS THAN (20), PARTITION p2 VALUES LESS THAN (30))") + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + replica := tb.Meta().TiFlashReplica + require.Nil(t, replica) + + tk.MustExec("alter table ddltiflash set tiflash replica 1") + lessThan := "40" + tk.MustExec(fmt.Sprintf("ALTER TABLE ddltiflash ADD PARTITION (PARTITION pn VALUES LESS THAN (%v))", lessThan)) + + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + // Should get schema again + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb2, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb2) + pi := tb2.Meta().GetPartitionInfo() + require.NotNil(t, pi) + require.NotNil(t, tb2.Meta().TiFlashReplica) + for _, p := range pi.Definitions { + require.True(t, tb2.Meta().TiFlashReplica.IsPartitionAvailable(p.ID)) + if len(p.LessThan) == 1 && p.LessThan[0] == lessThan { + table, ok := s.tiflash.GetTableSyncStatus(int(p.ID)) + require.True(t, ok) + require.True(t, table.Accel) + } + } + require.Zero(t, len(pi.AddingDefinitions)) + s.CheckFlashback(tk, t) +} + +// When block add partition, new partition shall be available even we break `UpdateTableReplicaInfo` +func TestTiFlashReplicaPartitionTableBlock(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10),PARTITION p1 VALUES LESS THAN (20), PARTITION p2 VALUES LESS THAN (30))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + // Make sure is available + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + CheckTableAvailable(s.dom, t, 1, []string{}) + + lessThan := "40" + // Stop loop + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/BeforeRefreshTiFlashTickeLoop", `return`)) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/ddl/BeforeRefreshTiFlashTickeLoop") + }() + + tk.MustExec(fmt.Sprintf("ALTER TABLE ddltiflash ADD PARTITION (PARTITION pn VALUES LESS THAN (%v))", lessThan)) + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + pi := tb.Meta().GetPartitionInfo() + require.NotNil(t, pi) + + // Partition `lessThan` shall be ready + for _, p := range pi.Definitions { + require.True(t, tb.Meta().TiFlashReplica.IsPartitionAvailable(p.ID)) + if len(p.LessThan) == 1 && p.LessThan[0] == lessThan { + table, ok := s.tiflash.GetTableSyncStatus(int(p.ID)) + require.True(t, ok) + require.True(t, table.Accel) + } + } + require.Equal(t, 0, len(pi.AddingDefinitions)) + s.CheckFlashback(tk, t) +} + +// TiFlash Table shall be eventually available. +func TestTiFlashReplicaAvailable(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int)") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tk.MustExec("drop table if exists ddltiflash2") + tk.MustExec("create table ddltiflash2 like ddltiflash") + tk.MustExec("alter table ddltiflash2 set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash2") + + s.CheckFlashback(tk, t) + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + r, ok := s.tiflash.GetPlacementRule(fmt.Sprintf("table-%v-r", tb.Meta().ID)) + require.NotNil(t, r) + require.True(t, ok) + tk.MustExec("alter table ddltiflash set tiflash replica 0") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + replica := tb.Meta().TiFlashReplica + require.Nil(t, replica) + r, ok = s.tiflash.GetPlacementRule(fmt.Sprintf("table-%v-r", tb.Meta().ID)) + require.Nil(t, r) + require.False(t, ok) +} + +// Truncate partition shall not block. +func TestTiFlashTruncatePartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(i int not null, s varchar(255)) partition by range (i) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + tk.MustExec("insert into ddltiflash values(1, 'abc'), (11, 'def')") + tk.MustExec("alter table ddltiflash truncate partition p1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") +} + +// Fail truncate partition. +func TestTiFlashFailTruncatePartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(i int not null, s varchar(255)) partition by range (i) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/FailTiFlashTruncatePartition", `return`)) + defer func() { + failpoint.Disable("github.com/pingcap/tidb/ddl/FailTiFlashTruncatePartition") + }() + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + + tk.MustExec("insert into ddltiflash values(1, 'abc'), (11, 'def')") + tk.MustGetErrMsg("alter table ddltiflash truncate partition p1", "[ddl:-1]enforced error") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") +} + +// Drop partition shall not block. +func TestTiFlashDropPartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(i int not null, s varchar(255)) partition by range (i) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") + tk.MustExec("alter table ddltiflash drop partition p1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable * 5) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash") +} + +func TestTiFlashFlashbackCluster(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + tk.MustExec("insert into t values (1), (2), (3)") + + ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{}) + require.NoError(t, err) + + tk.MustExec("alter table t set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "t") + + injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockFlashbackTest", `return(true)`)) + require.NoError(t, failpoint.Enable("tikvclient/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/expression/injectSafeTS", + fmt.Sprintf("return(%v)", injectSafeTS))) + + ChangeGCSafePoint(tk, time.Now().Add(-10*time.Second), "true", "10m0s") + defer func() { + ChangeGCSafePoint(tk, time.Now(), "true", "10m0s") + }() + + errorMsg := fmt.Sprintf("[ddl:-1]Detected unsupported DDL job type(%s) during [%s, now), can't do flashback", + model.ActionSetTiFlashReplica.String(), oracle.GetTimeFromTS(ts).String()) + tk.MustGetErrMsg(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts)), errorMsg) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockFlashbackTest")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/expression/injectSafeTS")) + require.NoError(t, failpoint.Disable("tikvclient/injectSafeTS")) +} + +func CheckTableAvailableWithTableName(dom *domain.Domain, t *testing.T, count uint64, labels []string, db string, table string) { + tb, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table)) + require.NoError(t, err) + replica := tb.Meta().TiFlashReplica + require.NotNil(t, replica) + require.True(t, replica.Available) + require.Equal(t, count, replica.Count) + require.ElementsMatch(t, labels, replica.LocationLabels) +} + +func CheckTableAvailable(dom *domain.Domain, t *testing.T, count uint64, labels []string) { + CheckTableAvailableWithTableName(dom, t, count, labels, "test", "ddltiflash") +} + +func CheckTableNoReplica(dom *domain.Domain, t *testing.T, db string, table string) { + tb, err := dom.InfoSchema().TableByName(model.NewCIStr(db), model.NewCIStr(table)) + require.NoError(t, err) + replica := tb.Meta().TiFlashReplica + require.Nil(t, replica) +} + +// Truncate table shall not block. +func TestTiFlashTruncateTable(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflashp") + tk.MustExec("create table ddltiflashp(z int not null) partition by range (z) (partition p0 values less than (10), partition p1 values less than (20))") + tk.MustExec("alter table ddltiflashp set tiflash replica 1") + + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + // Should get schema right now + tk.MustExec("truncate table ddltiflashp") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailablePartitionTable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflashp") + tk.MustExec("drop table if exists ddltiflash2") + tk.MustExec("create table ddltiflash2(z int)") + tk.MustExec("alter table ddltiflash2 set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + // Should get schema right now + + tk.MustExec("truncate table ddltiflash2") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", "ddltiflash2") +} + +// TiFlash Table shall be eventually available, even with lots of small table created. +func TestTiFlashMassiveReplicaAvailable(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + for i := 0; i < 100; i++ { + tk.MustExec(fmt.Sprintf("drop table if exists ddltiflash%v", i)) + tk.MustExec(fmt.Sprintf("create table ddltiflash%v(z int)", i)) + tk.MustExec(fmt.Sprintf("alter table ddltiflash%v set tiflash replica 1", i)) + } + + time.Sleep(ddl.PollTiFlashInterval * 10) + // Should get schema right now + for i := 0; i < 100; i++ { + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", fmt.Sprintf("ddltiflash%v", i)) + } +} + +// When set TiFlash replica, tidb shall add one Pd Rule for this table. +// When drop/truncate table, Pd Rule shall be removed in limited time. +func TestSetPlacementRuleNormal(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int)") + tk.MustExec("alter table ddltiflash set tiflash replica 1 location labels 'a','b'") + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + expectRule := infosync.MakeNewRule(tb.Meta().ID, 1, []string{"a", "b"}) + res := s.tiflash.CheckPlacementRule(*expectRule) + require.True(t, res) + + // Set lastSafePoint to a timepoint in future, so all dropped table can be reckon as gc-ed. + ChangeGCSafePoint(tk, time.Now().Add(+3*time.Second), "true", "10m0s") + defer func() { + ChangeGCSafePoint(tk, time.Now(), "true", "10m0s") + }() + fCancelPD := s.SetPdLoop(1) + defer fCancelPD() + tk.MustExec("drop table ddltiflash") + expectRule = infosync.MakeNewRule(tb.Meta().ID, 1, []string{"a", "b"}) + res = s.tiflash.CheckPlacementRule(*expectRule) + require.True(t, res) +} + +// When gc worker works, it will automatically remove pd rule for TiFlash. + +func TestSetPlacementRuleWithGCWorker(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + + rpcClient, pdClient, cluster, err := unistore.New("") + defer func() { + rpcClient.Close() + pdClient.Close() + cluster.Close() + }() + for _, store := range s.cluster.GetAllStores() { + cluster.AddStore(store.Id, store.Address, store.Labels...) + } + failpoint.Enable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed", `return`) + defer func() { + failpoint.Disable("github.com/pingcap/tidb/store/gcworker/ignoreDeleteRangeFailed") + }() + fCancelPD := s.SetPdLoop(10000) + defer fCancelPD() + + require.NoError(t, err) + gcWorker, err := gcworker.NewMockGCWorker(s.store) + require.NoError(t, err) + // Make SetPdLoop take effects. + time.Sleep(time.Second) + + fCancel := TempDisableEmulatorGC() + defer fCancel() + + tk := testkit.NewTestKit(t, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash_gc") + tk.MustExec("create table ddltiflash_gc(z int)") + tk.MustExec("alter table ddltiflash_gc set tiflash replica 1 location labels 'a','b'") + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash_gc")) + require.NoError(t, err) + + expectRule := infosync.MakeNewRule(tb.Meta().ID, 1, []string{"a", "b"}) + res := s.tiflash.CheckPlacementRule(*expectRule) + require.True(t, res) + + ChangeGCSafePoint(tk, time.Now().Add(-time.Hour), "true", "10m0s") + tk.MustExec("drop table ddltiflash_gc") + // Now gc will trigger, and will remove dropped table. + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + + // Wait GC + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + res = s.tiflash.CheckPlacementRule(*expectRule) + require.False(t, res) +} + +func TestSetPlacementRuleFail(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int)") + s.tiflash.PdSwitch(false) + defer func() { + s.tiflash.PdSwitch(true) + }() + tk.MustExec("alter table ddltiflash set tiflash replica 1") + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + + expectRule := infosync.MakeNewRule(tb.Meta().ID, 1, []string{}) + res := s.tiflash.CheckPlacementRule(*expectRule) + require.False(t, res) +} + +// Test standalone backoffer +func TestTiFlashBackoffer(t *testing.T) { + var maxTick ddl.TiFlashTick = 10 + var rate ddl.TiFlashTick = 1.5 + c := 2 + backoff, err := ddl.NewPollTiFlashBackoffContext(1, maxTick, c, rate) + require.NoError(t, err) + mustGet := func(ID int64) *ddl.PollTiFlashBackoffElement { + e, ok := backoff.Get(ID) + require.True(t, ok) + return e + } + mustNotGrow := func(ID int64) { + e := mustGet(ID) + ori := e.Threshold + oriTotal := e.TotalCounter + c := e.Counter + growed, ok, total := backoff.Tick(ID) + require.True(t, ok) + require.False(t, growed) + require.Equal(t, ori, e.Threshold) + require.Equal(t, c+1, e.Counter) + require.Equal(t, oriTotal+1, total) + } + mustGrow := func(ID int64) { + e := mustGet(ID) + ori := e.Threshold + oriTotal := e.TotalCounter + growed, ok, total := backoff.Tick(ID) + require.True(t, ok) + require.True(t, growed) + require.Equal(t, e.Threshold, rate*ori) + require.Equal(t, 1, e.Counter) + require.Equal(t, oriTotal+1, total) + } + // Test grow + ok := backoff.Put(1) + require.True(t, ok) + require.False(t, mustGet(1).NeedGrow()) + mustNotGrow(1) // 0;1 -> 1;1 + mustGrow(1) // 1;1 -> 0;1.5 -> 1;1.5 + mustGrow(1) // 1;1.5 -> 0;2.25 -> 1;2.25 + mustNotGrow(1) // 1;2.25 -> 2;2.25 + mustGrow(1) // 2;2.25 -> 0;3.375 -> 1;3.375 + mustNotGrow(1) // 1;3.375 -> 2;3.375 + mustNotGrow(1) // 2;3.375 -> 3;3.375 + mustGrow(1) // 3;3.375 -> 0;5.0625 + require.Equal(t, 8, mustGet(1).TotalCounter) + + // Test converge + backoff.Put(2) + for i := 0; i < 20; i++ { + backoff.Tick(2) + } + require.Equal(t, maxTick, mustGet(2).Threshold) + require.Equal(t, 20, mustGet(2).TotalCounter) + + // Test context + ok = backoff.Put(3) + require.False(t, ok) + _, ok, _ = backoff.Tick(3) + require.False(t, ok) + + require.True(t, backoff.Remove(1)) + require.False(t, backoff.Remove(1)) + require.Equal(t, 1, backoff.Len()) + + // Test error context + _, err = ddl.NewPollTiFlashBackoffContext(0.5, 1, 1, 1) + require.Error(t, err) + _, err = ddl.NewPollTiFlashBackoffContext(10, 1, 1, 1) + require.Error(t, err) + _, err = ddl.NewPollTiFlashBackoffContext(1, 10, 0, 1) + require.Error(t, err) + _, err = ddl.NewPollTiFlashBackoffContext(1, 10, 1, 0.5) + require.Error(t, err) + _, err = ddl.NewPollTiFlashBackoffContext(1, 10, 1, -1) + require.Error(t, err) +} + +// Test backoffer in TiFlash. +func TestTiFlashBackoff(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int)") + + // Not available for all tables + ddl.DisableTiFlashPoll(s.dom.DDL()) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplacePrevAvailableValue", `return(false)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`)) + ddl.EnableTiFlashPoll(s.dom.DDL()) + tk.MustExec("alter table ddltiflash set tiflash replica 1") + + // 1, 1.5, 2.25, 3.375, 5.5625 + // (1), 1, 1, 2, 3, 5 + time.Sleep(ddl.PollTiFlashInterval * 5) + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + require.False(t, tb.Meta().TiFlashReplica.Available) + + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplacePrevAvailableValue")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue")) + + time.Sleep(ddl.PollTiFlashInterval * 3) + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + require.True(t, tb.Meta().TiFlashReplica.Available) +} + +func TestAlterDatabaseErrorGrammar(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica") + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica") + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2", "[ddl:8200]Unsupported multi schema change for set tiflash replica") + tk.MustGetErrMsg("ALTER DATABASE t SET TIFLASH REPLICA 1 LOCATION LABELS 'a','b' SET TIFLASH REPLICA 2 LOCATION LABELS 'a','b'", "[ddl:8200]Unsupported multi schema change for set tiflash replica") +} + +func TestAlterDatabaseBasic(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("drop database if exists tiflash_ddl") + tk.MustExec("create database tiflash_ddl") + tk.MustExec("create table tiflash_ddl.ddltiflash(z int)") + tk.MustExec("create table tiflash_ddl.ddltiflash2(z int)") + // ALTER DATABASE can override previous ALTER TABLE. + tk.MustExec("alter table tiflash_ddl.ddltiflash set tiflash replica 1") + tk.MustExec("alter database tiflash_ddl set tiflash replica 2") + require.Equal(t, "In total 2 tables: 2 succeed, 0 failed, 0 skipped", tk.Session().GetSessionVars().StmtCtx.GetMessage()) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 2) + CheckTableAvailableWithTableName(s.dom, t, 2, []string{}, "tiflash_ddl", "ddltiflash") + CheckTableAvailableWithTableName(s.dom, t, 2, []string{}, "tiflash_ddl", "ddltiflash2") + + // Skip already set TiFlash tables. + tk.MustExec("alter database tiflash_ddl set tiflash replica 2") + require.Equal(t, "In total 2 tables: 0 succeed, 0 failed, 2 skipped", tk.Session().GetSessionVars().StmtCtx.GetMessage()) + CheckTableAvailableWithTableName(s.dom, t, 2, []string{}, "tiflash_ddl", "ddltiflash") + CheckTableAvailableWithTableName(s.dom, t, 2, []string{}, "tiflash_ddl", "ddltiflash2") + + // There is no existing database. + tk.MustExec("drop database if exists tiflash_ddl_missing") + tk.MustGetErrMsg("alter database tiflash_ddl_missing set tiflash replica 2", "[schema:1049]Unknown database 'tiflash_ddl_missing'") + + // There is no table in database + tk.MustExec("drop database if exists tiflash_ddl_empty") + tk.MustExec("create database tiflash_ddl_empty") + tk.MustGetErrMsg("alter database tiflash_ddl_empty set tiflash replica 2", "[schema:1049]Empty database 'tiflash_ddl_empty'") + + // There is less TiFlash store + tk.MustGetErrMsg("alter database tiflash_ddl set tiflash replica 3", "the tiflash replica count: 3 should be less than the total tiflash server count: 2") +} + +func checkBatchPandingNum(t *testing.T, tkx *testkit.TestKit, level string, value string, ok bool) { + l := len(tkx.MustQuery(fmt.Sprintf("show %v variables where Variable_name='tidb_batch_pending_tiflash_count' and Value='%v'", level, value)).Rows()) + if ok { + require.Equal(t, 1, l) + } else { + require.Equal(t, 0, l) + } +} + +func TestTiFlashBatchAddVariables(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("set SESSION tidb_batch_pending_tiflash_count=5") + tk.MustExec("set GLOBAL tidb_batch_pending_tiflash_count=6") + + checkBatchPandingNum(t, tk, "session", "5", true) + checkBatchPandingNum(t, tk, "global", "6", true) + checkBatchPandingNum(t, tk, "global", "1.5", false) + + tk.MustGetErrMsg("set GLOBAL tidb_batch_pending_tiflash_count=1.5", "[variable:1232]Incorrect argument type to variable 'tidb_batch_pending_tiflash_count'") + checkBatchPandingNum(t, tk, "global", "6", true) + + tk2 := testkit.NewTestKit(t, store) + checkBatchPandingNum(t, tk2, "session", "6", true) +} + +func execWithTimeout(t *testing.T, tk *testkit.TestKit, to time.Duration, sql string) (bool, error) { + ctx, cancel := context.WithTimeout(context.Background(), to) + defer cancel() + doneCh := make(chan error, 1) + + go func() { + _, err := tk.Exec(sql) + doneCh <- err + }() + + select { + case e := <-doneCh: + // Exit normally + return false, e + case <-ctx.Done(): + // Exceed given timeout + logutil.BgLogger().Info("execWithTimeout meet timeout", zap.String("sql", sql)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/BatchAddTiFlashSendDone", "return(true)")) + } + + e := <-doneCh + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/BatchAddTiFlashSendDone")) + return true, e +} + +func TestTiFlashBatchRateLimiter(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + threshold := 2 + tk.MustExec("create database tiflash_ddl_limit") + tk.MustExec(fmt.Sprintf("set SESSION tidb_batch_pending_tiflash_count=%v", threshold)) + for i := 0; i < threshold; i++ { + tk.MustExec(fmt.Sprintf("create table tiflash_ddl_limit.t%v(z int)", i)) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue")) + }() + + tk.MustExec("alter database tiflash_ddl_limit set tiflash replica 1") + tk.MustExec(fmt.Sprintf("create table tiflash_ddl_limit.t%v(z int)", threshold)) + // The following statement shall fail, because it reaches limit + timeOut, err := execWithTimeout(t, tk, time.Second*1, "alter database tiflash_ddl_limit set tiflash replica 1") + require.NoError(t, err) + require.True(t, timeOut) + + // There must be one table with no TiFlashReplica. + check := func(expected int, total int) { + cnt := 0 + for i := 0; i < total; i++ { + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_ddl_limit"), model.NewCIStr(fmt.Sprintf("t%v", i))) + require.NoError(t, err) + if tb.Meta().TiFlashReplica != nil { + cnt++ + } + } + require.Equal(t, expected, cnt) + } + check(2, 3) + + // If we exec in another session, it will not trigger limit. Since DefTiDBBatchPendingTiFlashCount is more than 3. + tk2 := testkit.NewTestKit(t, s.store) + tk2.MustExec("alter database tiflash_ddl_limit set tiflash replica 1") + check(3, 3) + + loop := 3 + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/FastFailCheckTiFlashPendingTables", fmt.Sprintf("return(%v)", loop))) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/FastFailCheckTiFlashPendingTables")) + }() + // We will force trigger its DDL to update schema cache. + tk.MustExec(fmt.Sprintf("create table tiflash_ddl_limit.t%v(z int)", threshold+1)) + timeOut, err = execWithTimeout(t, tk, time.Millisecond*time.Duration(200*(loop+1)), "alter database tiflash_ddl_limit set tiflash replica 1") + require.NoError(t, err) + require.False(t, timeOut) + check(4, 4) + + // However, forceCheck is true, so we will still enter try loop. + tk.MustExec(fmt.Sprintf("create table tiflash_ddl_limit.t%v(z int)", threshold+2)) + timeOut, err = execWithTimeout(t, tk, time.Millisecond*200, "alter database tiflash_ddl_limit set tiflash replica 1") + require.NoError(t, err) + require.True(t, timeOut) + check(4, 5) + + // Retrigger, but close session before the whole job ends. + var wg util.WaitGroupWrapper + var mu sync.Mutex + wg.Run(func() { + time.Sleep(time.Millisecond * 20) + mu.Lock() + defer mu.Unlock() + tk.Session().Close() + logutil.BgLogger().Info("session closed") + }) + mu.Lock() + timeOut, err = execWithTimeout(t, tk, time.Second*2, "alter database tiflash_ddl_limit set tiflash replica 1") + mu.Unlock() + require.NoError(t, err) + require.False(t, timeOut) + check(5, 5) + wg.Wait() +} + +func TestTiFlashBatchKill(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("create database tiflash_ddl_limit") + tk.MustExec("set SESSION tidb_batch_pending_tiflash_count=0") + tk.MustExec("create table tiflash_ddl_limit.t0(z int)") + + var wg util.WaitGroupWrapper + wg.Run(func() { + time.Sleep(time.Millisecond * 100) + sessVars := tk.Session().GetSessionVars() + atomic.StoreUint32(&sessVars.Killed, 1) + }) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/FastFailCheckTiFlashPendingTables", `return(2)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/FastFailCheckTiFlashPendingTables")) + }() + timeOut, err := execWithTimeout(t, tk, time.Second*2000, "alter database tiflash_ddl_limit set tiflash replica 1") + require.Error(t, err, "[executor:1317]Query execution was interrupted") + require.False(t, timeOut) + wg.Wait() +} + +func TestTiFlashBatchUnsupported(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("create database tiflash_ddl_view") + tk.MustExec("create table tiflash_ddl_view.t(z int)") + tk.MustExec("insert into tiflash_ddl_view.t values (1)") + tk.MustExec("CREATE VIEW tiflash_ddl_view.v AS select * from tiflash_ddl_view.t") + tk.MustExec("alter database tiflash_ddl_view set tiflash replica 1") + require.Equal(t, "In total 2 tables: 1 succeed, 0 failed, 1 skipped", tk.Session().GetSessionVars().StmtCtx.GetMessage()) + tk.MustGetErrCode("alter database information_schema set tiflash replica 1", 8200) +} + +func TestTiFlashProgress(t *testing.T) { + s, teardown := createTiFlashContext(t) + s.tiflash.NotAvailable = true + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("create database tiflash_d") + tk.MustExec("create table tiflash_d.t(z int)") + tk.MustExec("alter table tiflash_d.t set tiflash replica 1") + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + require.NoError(t, err) + require.NotNil(t, tb) + mustExist := func(tid int64) { + _, isExist := infosync.GetTiFlashProgressFromCache(tid) + require.True(t, isExist) + } + mustAbsent := func(tid int64) { + _, isExist := infosync.GetTiFlashProgressFromCache(tid) + require.False(t, isExist) + } + infosync.UpdateTiFlashProgressCache(tb.Meta().ID, 5.0) + mustExist(tb.Meta().ID) + _ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta()) + mustAbsent(tb.Meta().ID) + + infosync.UpdateTiFlashProgressCache(tb.Meta().ID, 5.0) + tk.MustExec("truncate table tiflash_d.t") + mustAbsent(tb.Meta().ID) + + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + infosync.UpdateTiFlashProgressCache(tb.Meta().ID, 5.0) + tk.MustExec("alter table tiflash_d.t set tiflash replica 0") + mustAbsent(tb.Meta().ID) + tk.MustExec("alter table tiflash_d.t set tiflash replica 1") + + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + infosync.UpdateTiFlashProgressCache(tb.Meta().ID, 5.0) + tk.MustExec("drop table tiflash_d.t") + mustAbsent(tb.Meta().ID) + + time.Sleep(100 * time.Millisecond) +} + +func TestTiFlashProgressForPartitionTable(t *testing.T) { + s, teardown := createTiFlashContext(t) + s.tiflash.NotAvailable = true + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("create database tiflash_d") + tk.MustExec("create table tiflash_d.t(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + tk.MustExec("alter table tiflash_d.t set tiflash replica 1") + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + require.NoError(t, err) + require.NotNil(t, tb) + mustExist := func(tid int64) { + _, isExist := infosync.GetTiFlashProgressFromCache(tid) + require.True(t, isExist) + } + mustAbsent := func(tid int64) { + _, isExist := infosync.GetTiFlashProgressFromCache(tid) + require.False(t, isExist) + } + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + mustExist(tb.Meta().Partition.Definitions[0].ID) + _ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta()) + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + + infosync.UpdateTiFlashProgressCache(tb.Meta().Partition.Definitions[0].ID, 5.0) + tk.MustExec("truncate table tiflash_d.t") + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + infosync.UpdateTiFlashProgressCache(tb.Meta().Partition.Definitions[0].ID, 5.0) + tk.MustExec("alter table tiflash_d.t set tiflash replica 0") + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + tk.MustExec("alter table tiflash_d.t set tiflash replica 1") + + tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t")) + infosync.UpdateTiFlashProgressCache(tb.Meta().Partition.Definitions[0].ID, 5.0) + tk.MustExec("drop table tiflash_d.t") + mustAbsent(tb.Meta().Partition.Definitions[0].ID) + + time.Sleep(100 * time.Millisecond) +} + +func TestTiFlashGroupIndexWhenStartup(t *testing.T) { + s, teardown := createTiFlashContext(t) + tiflash := s.tiflash + defer teardown() + _ = testkit.NewTestKit(t, s.store) + timeout := time.Now().Add(10 * time.Second) + errMsg := "time out" + for time.Now().Before(timeout) { + time.Sleep(100 * time.Millisecond) + if tiflash.GetRuleGroupIndex() != 0 { + errMsg = "invalid group index" + break + } + } + require.Equal(t, placement.RuleIndexTiFlash, tiflash.GetRuleGroupIndex(), errMsg) + require.Greater(t, tiflash.GetRuleGroupIndex(), placement.RuleIndexTable) + require.Greater(t, tiflash.GetRuleGroupIndex(), placement.RuleIndexPartition) +} + +func TestTiFlashProgressAfterAvailable(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int)") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + // after available, progress should can be updated. + s.tiflash.ResetSyncStatus(int(tb.Meta().ID), false) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + progress, isExist := infosync.GetTiFlashProgressFromCache(tb.Meta().ID) + require.True(t, isExist) + require.True(t, progress == 0) + + s.tiflash.ResetSyncStatus(int(tb.Meta().ID), true) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + progress, isExist = infosync.GetTiFlashProgressFromCache(tb.Meta().ID) + require.True(t, isExist) + require.True(t, progress == 1) +} + +func TestTiFlashProgressAfterAvailableForPartitionTable(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + // after available, progress should can be updated. + s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), false) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + progress, isExist := infosync.GetTiFlashProgressFromCache(tb.Meta().Partition.Definitions[0].ID) + require.True(t, isExist) + require.True(t, progress == 0) + + s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), true) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + progress, isExist = infosync.GetTiFlashProgressFromCache(tb.Meta().Partition.Definitions[0].ID) + require.True(t, isExist) + require.True(t, progress == 1) +} + +func TestTiFlashProgressCache(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int)") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + infosync.UpdateTiFlashProgressCache(tb.Meta().ID, 0) + // after available, it will still update progress cache. + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + progress, isExist := infosync.GetTiFlashProgressFromCache(tb.Meta().ID) + require.True(t, isExist) + require.True(t, progress == 1) +} + +func TestTiFlashProgressAvailableList(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tableCount := 8 + tableNames := make([]string, tableCount) + tbls := make([]table.Table, tableCount) + + tk.MustExec("use test") + for i := 0; i < tableCount; i++ { + tableNames[i] = fmt.Sprintf("ddltiflash%d", i) + tk.MustExec(fmt.Sprintf("drop table if exists %s", tableNames[i])) + tk.MustExec(fmt.Sprintf("create table %s(z int)", tableNames[i])) + tk.MustExec(fmt.Sprintf("alter table %s set tiflash replica 1", tableNames[i])) + } + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + for i := 0; i < tableCount; i++ { + CheckTableAvailableWithTableName(s.dom, t, 1, []string{}, "test", tableNames[i]) + } + + // After available, reset TiFlash sync status. + for i := 0; i < tableCount; i++ { + var err error + tbls[i], err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr(tableNames[i])) + require.NoError(t, err) + require.NotNil(t, tbls[i]) + s.tiflash.ResetSyncStatus(int(tbls[i].Meta().ID), false) + } + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollAvailableTableProgressMaxCount", `return(2)`)) + defer func() { + _ = failpoint.Disable("github.com/pingcap/tidb/ddl/PollAvailableTableProgressMaxCount") + }() + + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + // Not all table have updated progress + UpdatedTableCount := 0 + for i := 0; i < tableCount; i++ { + progress, isExist := infosync.GetTiFlashProgressFromCache(tbls[i].Meta().ID) + require.True(t, isExist) + if progress == 0 { + UpdatedTableCount++ + } + } + require.NotEqual(t, tableCount, UpdatedTableCount) + require.NotEqual(t, 0, UpdatedTableCount) + for i := 0; i < tableCount; i++ { + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable) + } + // All table have updated progress + UpdatedTableCount = 0 + for i := 0; i < tableCount; i++ { + progress, isExist := infosync.GetTiFlashProgressFromCache(tbls[i].Meta().ID) + require.True(t, isExist) + if progress == 0 { + UpdatedTableCount++ + } + } + require.Equal(t, tableCount, UpdatedTableCount) +} + +func TestTiFlashAvailableAfterResetReplica(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int)") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/infoschema/mockTiFlashStoreCount")) + }() + + tk.MustExec("alter table ddltiflash set tiflash replica 2") + CheckTableAvailable(s.dom, t, 2, []string{}) + + tk.MustExec("alter table ddltiflash set tiflash replica 0") + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + require.Nil(t, tb.Meta().TiFlashReplica) +} + +func TestTiFlashPartitionNotAvailable(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + + tk.MustExec("alter table ddltiflash set tiflash replica 1") + s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), false) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + replica := tb.Meta().TiFlashReplica + require.NotNil(t, replica) + require.False(t, replica.Available) + + s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), true) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + replica = tb.Meta().TiFlashReplica + require.NotNil(t, replica) + require.True(t, replica.Available) + + s.tiflash.ResetSyncStatus(int(tb.Meta().Partition.Definitions[0].ID), false) + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + require.NoError(t, err) + require.NotNil(t, tb) + replica = tb.Meta().TiFlashReplica + require.NotNil(t, replica) + require.True(t, replica.Available) +} + +func TestTiFlashAvailableAfterAddPartition(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + + tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + require.NotNil(t, tb) + + // still available after adding partition. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly", `return(2)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/waitForAddPartition", `return(3)`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue", `return(false)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/sleepBeforeReplicaOnly")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/waitForAddPartition")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/PollTiFlashReplicaStatusReplaceCurAvailableValue")) + }() + tk.MustExec("ALTER TABLE ddltiflash ADD PARTITION (PARTITION pn VALUES LESS THAN (20))") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) + tb, err = s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash")) + require.NoError(t, err) + pi := tb.Meta().GetPartitionInfo() + require.NotNil(t, pi) + require.Equal(t, len(pi.Definitions), 2) +} + +func TestTiFlashAvailableAfterDownOneStore(t *testing.T) { + s, teardown := createTiFlashContext(t) + defer teardown() + tk := testkit.NewTestKit(t, s.store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists ddltiflash") + tk.MustExec("create table ddltiflash(z int) PARTITION BY RANGE(z) (PARTITION p0 VALUES LESS THAN (10))") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown", `return`)) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown", `return`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/OneTiFlashStoreDown")) + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/domain/infosync/OneTiFlashStoreDown")) + }() + + tk.MustExec("alter table ddltiflash set tiflash replica 1") + time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable * 3) + CheckTableAvailable(s.dom, t, 1, []string{}) +} diff --git a/domain/infosync/tiflash_manager.go b/domain/infosync/tiflash_manager.go new file mode 100644 index 0000000000000..d5cc46f95db95 --- /dev/null +++ b/domain/infosync/tiflash_manager.go @@ -0,0 +1,897 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package infosync + +import ( + "bytes" + "context" + "encoding/hex" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/ddl/placement" + "github.com/pingcap/tidb/store/helper" + "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/pdapi" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +// TiFlashReplicaManager manages placement settings and replica progress for TiFlash. +type TiFlashReplicaManager interface { + // SetTiFlashGroupConfig sets the group index of the tiflash placement rule + SetTiFlashGroupConfig(ctx context.Context) error + // SetPlacementRule is a helper function to set placement rule. + SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error + // DeletePlacementRule is to delete placement rule for certain group. + DeletePlacementRule(ctx context.Context, group string, ruleID string) error + // GetGroupRules to get all placement rule in a certain group. + GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) + // PostAccelerateSchedule sends `regions/accelerate-schedule` request. + PostAccelerateSchedule(ctx context.Context, tableID int64) error + // GetRegionCountFromPD is a helper function calling `/stats/region`. + GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error + // GetStoresStat gets the TiKV store information by accessing PD's api. + GetStoresStat(ctx context.Context) (*helper.StoresStat, error) + // CalculateTiFlashProgress calculates TiFlash replica progress + CalculateTiFlashProgress(tableID int64, replicaCount uint64, TiFlashStores map[int64]helper.StoreStat) (float64, error) + // UpdateTiFlashProgressCache updates tiflashProgressCache + UpdateTiFlashProgressCache(tableID int64, progress float64) + // GetTiFlashProgressFromCache gets tiflash replica progress from tiflashProgressCache + GetTiFlashProgressFromCache(tableID int64) (float64, bool) + // DeleteTiFlashProgressFromCache delete tiflash replica progress from tiflashProgressCache + DeleteTiFlashProgressFromCache(tableID int64) + // CleanTiFlashProgressCache clean progress cache + CleanTiFlashProgressCache() + // Close is to close TiFlashReplicaManager + Close(ctx context.Context) +} + +// TiFlashReplicaManagerCtx manages placement with pd and replica progress for TiFlash. +type TiFlashReplicaManagerCtx struct { + etcdCli *clientv3.Client + sync.RWMutex // protect tiflashProgressCache + tiflashProgressCache map[int64]float64 +} + +// Close is called to close TiFlashReplicaManagerCtx. +func (m *TiFlashReplicaManagerCtx) Close(ctx context.Context) { + +} + +func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, tableID int64) (int, error) { + // storeIDs -> regionID, PD will not create two peer on the same store + var flashPeerCount int + for _, store := range tiFlashStores { + regionReplica := make(map[int64]int) + err := helper.CollectTiFlashStatus(store.Store.StatusAddress, tableID, ®ionReplica) + failpoint.Inject("OneTiFlashStoreDown", func() { + if store.Store.StateName == "Down" { + err = errors.New("mock TiFlasah down") + } + }) + if err != nil { + logutil.BgLogger().Error("Fail to get peer status from TiFlash.", + zap.Int64("tableID", tableID)) + // Just skip down or offline or tomestone stores, because PD will migrate regions from these stores. + if store.Store.StateName == "Up" || store.Store.StateName == "Disconnected" { + return 0, err + } + continue + } + flashPeerCount += len(regionReplica) + } + return flashPeerCount, nil +} + +// calculateTiFlashProgress calculates progress based on the region status from PD and TiFlash. +func calculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) { + var regionCount int + if err := GetTiFlashRegionCountFromPD(context.Background(), tableID, ®ionCount); err != nil { + logutil.BgLogger().Error("Fail to get regionCount from PD.", + zap.Int64("tableID", tableID)) + return 0, errors.Trace(err) + } + + if regionCount == 0 { + logutil.BgLogger().Warn("region count getting from PD is 0.", + zap.Int64("tableID", tableID)) + return 0, fmt.Errorf("region count getting from PD is 0") + } + + tiflashPeerCount, err := getTiFlashPeerWithoutLagCount(tiFlashStores, tableID) + if err != nil { + logutil.BgLogger().Error("Fail to get peer count from TiFlash.", + zap.Int64("tableID", tableID)) + return 0, errors.Trace(err) + } + progress := float64(tiflashPeerCount) / float64(regionCount*int(replicaCount)) + if progress > 1 { // when pd do balance + logutil.BgLogger().Debug("TiFlash peer count > pd peer count, maybe doing balance.", + zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount)) + progress = 1 + } + if progress < 1 { + logutil.BgLogger().Debug("TiFlash replica progress < 1.", + zap.Int64("tableID", tableID), zap.Int("tiflashPeerCount", tiflashPeerCount), zap.Int("regionCount", regionCount), zap.Uint64("replicaCount", replicaCount)) + } + return progress, nil +} + +// CalculateTiFlashProgress calculates TiFlash replica progress. +func (m *TiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) { + return calculateTiFlashProgress(tableID, replicaCount, tiFlashStores) +} + +// UpdateTiFlashProgressCache updates tiflashProgressCache +func (m *TiFlashReplicaManagerCtx) UpdateTiFlashProgressCache(tableID int64, progress float64) { + m.Lock() + defer m.Unlock() + m.tiflashProgressCache[tableID] = progress +} + +// GetTiFlashProgressFromCache gets tiflash replica progress from tiflashProgressCache +func (m *TiFlashReplicaManagerCtx) GetTiFlashProgressFromCache(tableID int64) (float64, bool) { + m.RLock() + defer m.RUnlock() + progress, ok := m.tiflashProgressCache[tableID] + return progress, ok +} + +// DeleteTiFlashProgressFromCache delete tiflash replica progress from tiflashProgressCache +func (m *TiFlashReplicaManagerCtx) DeleteTiFlashProgressFromCache(tableID int64) { + m.Lock() + defer m.Unlock() + delete(m.tiflashProgressCache, tableID) +} + +// CleanTiFlashProgressCache clean progress cache +func (m *TiFlashReplicaManagerCtx) CleanTiFlashProgressCache() { + m.Lock() + defer m.Unlock() + m.tiflashProgressCache = make(map[int64]float64) +} + +// SetTiFlashGroupConfig sets the tiflash's rule group config +func (m *TiFlashReplicaManagerCtx) SetTiFlashGroupConfig(ctx context.Context) error { + res, err := doRequest(ctx, + "GetRuleGroupConfig", + m.etcdCli.Endpoints(), + path.Join(pdapi.Config, "rule_group", placement.TiFlashRuleGroupID), + "GET", + nil, + ) + + if err != nil { + return errors.Trace(err) + } + + var groupConfig placement.RuleGroupConfig + shouldUpdate := res == nil + if res != nil { + if err = json.Unmarshal(res, &groupConfig); err != nil { + return errors.Trace(err) + } + + if groupConfig.Index != placement.RuleIndexTiFlash || groupConfig.Override { + shouldUpdate = true + } + } + + if shouldUpdate { + groupConfig.ID = placement.TiFlashRuleGroupID + groupConfig.Index = placement.RuleIndexTiFlash + groupConfig.Override = false + + body, err := json.Marshal(&groupConfig) + if err != nil { + return errors.Trace(err) + } + + _, err = doRequest(ctx, + "SetRuleGroupConfig", + m.etcdCli.Endpoints(), + path.Join(pdapi.Config, "rule_group"), + "POST", + bytes.NewBuffer(body), + ) + + if err != nil { + return errors.Trace(err) + } + } + return nil +} + +// SetPlacementRule is a helper function to set placement rule. +func (m *TiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error { + if err := m.SetTiFlashGroupConfig(ctx); err != nil { + return err + } + + if rule.Count == 0 { + return m.DeletePlacementRule(ctx, rule.GroupID, rule.ID) + } + j, _ := json.Marshal(rule) + buf := bytes.NewBuffer(j) + res, err := doRequest(ctx, "SetPlacementRule", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule"), "POST", buf) + if err != nil { + return errors.Trace(err) + } + if res == nil { + return fmt.Errorf("TiFlashReplicaManagerCtx returns error in SetPlacementRule") + } + return nil +} + +// DeletePlacementRule is to delete placement rule for certain group. +func (m *TiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, group string, ruleID string) error { + res, err := doRequest(ctx, "DeletePlacementRule", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rule", group, ruleID), "DELETE", nil) + if err != nil { + return errors.Trace(err) + } + if res == nil { + return fmt.Errorf("TiFlashReplicaManagerCtx returns error in DeletePlacementRule") + } + return nil +} + +// GetGroupRules to get all placement rule in a certain group. +func (m *TiFlashReplicaManagerCtx) GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) { + res, err := doRequest(ctx, "GetGroupRules", m.etcdCli.Endpoints(), path.Join(pdapi.Config, "rules", "group", group), "GET", nil) + if err != nil { + return nil, errors.Trace(err) + } + if res == nil { + return nil, fmt.Errorf("TiFlashReplicaManagerCtx returns error in GetGroupRules") + } + + var rules []placement.TiFlashRule + err = json.Unmarshal(res, &rules) + if err != nil { + return nil, errors.Trace(err) + } + + return rules, nil +} + +// PostAccelerateSchedule sends `regions/accelerate-schedule` request. +func (m *TiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Context, tableID int64) error { + startKey := tablecodec.GenTableRecordPrefix(tableID) + endKey := tablecodec.EncodeTablePrefix(tableID + 1) + startKey = codec.EncodeBytes([]byte{}, startKey) + endKey = codec.EncodeBytes([]byte{}, endKey) + + input := map[string]string{ + "start_key": hex.EncodeToString(startKey), + "end_key": hex.EncodeToString(endKey), + } + j, err := json.Marshal(input) + if err != nil { + return errors.Trace(err) + } + buf := bytes.NewBuffer(j) + res, err := doRequest(ctx, "PostAccelerateSchedule", m.etcdCli.Endpoints(), "/pd/api/v1/regions/accelerate-schedule", "POST", buf) + if err != nil { + return errors.Trace(err) + } + if res == nil { + return fmt.Errorf("TiFlashReplicaManagerCtx returns error in PostAccelerateSchedule") + } + return nil +} + +// GetRegionCountFromPD is a helper function calling `/stats/region`. +func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error { + startKey := tablecodec.GenTableRecordPrefix(tableID) + endKey := tablecodec.EncodeTablePrefix(tableID + 1) + startKey = codec.EncodeBytes([]byte{}, startKey) + endKey = codec.EncodeBytes([]byte{}, endKey) + + p := fmt.Sprintf("/pd/api/v1/stats/region?start_key=%s&end_key=%s&count", + url.QueryEscape(string(startKey)), + url.QueryEscape(string(endKey))) + res, err := doRequest(ctx, "GetPDRegionStats", m.etcdCli.Endpoints(), p, "GET", nil) + if err != nil { + return errors.Trace(err) + } + if res == nil { + return fmt.Errorf("TiFlashReplicaManagerCtx returns error in GetRegionCountFromPD") + } + var stats helper.PDRegionStats + err = json.Unmarshal(res, &stats) + if err != nil { + return errors.Trace(err) + } + *regionCount = stats.Count + return nil +} + +// GetStoresStat gets the TiKV store information by accessing PD's api. +func (m *TiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*helper.StoresStat, error) { + var storesStat helper.StoresStat + res, err := doRequest(ctx, "GetStoresStat", m.etcdCli.Endpoints(), pdapi.Stores, "GET", nil) + if err != nil { + return nil, errors.Trace(err) + } + if res == nil { + return nil, fmt.Errorf("TiFlashReplicaManagerCtx returns error in GetStoresStat") + } + + err = json.Unmarshal(res, &storesStat) + if err != nil { + return nil, errors.Trace(err) + } + return &storesStat, err +} + +type mockTiFlashReplicaManagerCtx struct { + sync.RWMutex + // Set to nil if there is no need to set up a mock TiFlash server. + // Otherwise use NewMockTiFlash to create one. + tiflash *MockTiFlash + tiflashProgressCache map[int64]float64 +} + +func makeBaseRule() placement.TiFlashRule { + return placement.TiFlashRule{ + GroupID: placement.TiFlashRuleGroupID, + ID: "", + Index: placement.RuleIndexTiFlash, + Override: false, + Role: placement.Learner, + Count: 2, + Constraints: []placement.Constraint{ + { + Key: "engine", + Op: placement.In, + Values: []string{"tiflash"}, + }, + }, + } +} + +// MakeNewRule creates a pd rule for TiFlash. +func MakeNewRule(ID int64, Count uint64, LocationLabels []string) *placement.TiFlashRule { + ruleID := fmt.Sprintf("table-%v-r", ID) + startKey := tablecodec.GenTableRecordPrefix(ID) + endKey := tablecodec.EncodeTablePrefix(ID + 1) + startKey = codec.EncodeBytes([]byte{}, startKey) + endKey = codec.EncodeBytes([]byte{}, endKey) + + ruleNew := makeBaseRule() + ruleNew.ID = ruleID + ruleNew.StartKeyHex = startKey.String() + ruleNew.EndKeyHex = endKey.String() + ruleNew.Count = int(Count) + ruleNew.LocationLabels = LocationLabels + + return &ruleNew +} + +type mockTiFlashTableInfo struct { + Regions []int + Accel bool +} + +func (m *mockTiFlashTableInfo) String() string { + regionStr := "" + for _, s := range m.Regions { + regionStr = regionStr + strconv.Itoa(s) + "\n" + } + if regionStr == "" { + regionStr = "\n" + } + return fmt.Sprintf("%v\n%v", len(m.Regions), regionStr) +} + +// MockTiFlash mocks a TiFlash, with necessary Pd support. +type MockTiFlash struct { + sync.Mutex + groupIndex int + StatusAddr string + StatusServer *httptest.Server + SyncStatus map[int]mockTiFlashTableInfo + StoreInfo map[uint64]helper.StoreBaseStat + GlobalTiFlashPlacementRules map[string]placement.TiFlashRule + PdEnabled bool + TiflashDelay time.Duration + StartTime time.Time + NotAvailable bool +} + +func (tiflash *MockTiFlash) setUpMockTiFlashHTTPServer() { + tiflash.Lock() + defer tiflash.Unlock() + // mock TiFlash http server + router := mux.NewRouter() + server := httptest.NewServer(router) + // mock store stats stat + statusAddr := strings.TrimPrefix(server.URL, "http://") + statusAddrVec := strings.Split(statusAddr, ":") + statusPort, _ := strconv.Atoi(statusAddrVec[1]) + router.HandleFunc("/tiflash/sync-status/{tableid:\\d+}", func(w http.ResponseWriter, req *http.Request) { + tiflash.Lock() + defer tiflash.Unlock() + params := mux.Vars(req) + tableID, err := strconv.Atoi(params["tableid"]) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + table, ok := tiflash.SyncStatus[tableID] + if tiflash.NotAvailable { + // No region is available, so the table is not available. + table.Regions = []int{} + } + if !ok { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("0\n\n")) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(table.String())) + }) + router.HandleFunc("/config", func(w http.ResponseWriter, req *http.Request) { + tiflash.Lock() + defer tiflash.Unlock() + s := fmt.Sprintf("{\n \"engine-store\": {\n \"http_port\": %v\n }\n}", statusPort) + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(s)) + }) + tiflash.StatusServer = server + tiflash.StatusAddr = statusAddr +} + +// NewMockTiFlash creates a MockTiFlash with a mocked TiFlash server. +func NewMockTiFlash() *MockTiFlash { + tiflash := &MockTiFlash{ + StatusAddr: "", + StatusServer: nil, + SyncStatus: make(map[int]mockTiFlashTableInfo), + StoreInfo: make(map[uint64]helper.StoreBaseStat), + GlobalTiFlashPlacementRules: make(map[string]placement.TiFlashRule), + PdEnabled: true, + TiflashDelay: 0, + StartTime: time.Now(), + NotAvailable: false, + } + tiflash.setUpMockTiFlashHTTPServer() + return tiflash +} + +// HandleSetPlacementRule is mock function for SetTiFlashPlacementRule. +func (tiflash *MockTiFlash) HandleSetPlacementRule(rule placement.TiFlashRule) error { + tiflash.Lock() + defer tiflash.Unlock() + tiflash.groupIndex = placement.RuleIndexTiFlash + if !tiflash.PdEnabled { + logutil.BgLogger().Info("pd server is manually disabled, just quit") + return nil + } + + if rule.Count == 0 { + delete(tiflash.GlobalTiFlashPlacementRules, rule.ID) + } else { + tiflash.GlobalTiFlashPlacementRules[rule.ID] = rule + } + // Pd shall schedule TiFlash, we can mock here + tid := 0 + _, err := fmt.Sscanf(rule.ID, "table-%d-r", &tid) + if err != nil { + return errors.New("Can't parse rule") + } + // Set up mock TiFlash replica + f := func() { + if z, ok := tiflash.SyncStatus[tid]; ok { + z.Regions = []int{1} + tiflash.SyncStatus[tid] = z + } else { + tiflash.SyncStatus[tid] = mockTiFlashTableInfo{ + Regions: []int{1}, + Accel: false, + } + } + } + if tiflash.TiflashDelay > 0 { + go func() { + time.Sleep(tiflash.TiflashDelay) + logutil.BgLogger().Warn("TiFlash replica is available after delay", zap.Duration("duration", tiflash.TiflashDelay)) + f() + }() + } else { + f() + } + return nil +} + +// ResetSyncStatus is mock function for reset sync status. +func (tiflash *MockTiFlash) ResetSyncStatus(tableID int, canAvailable bool) { + tiflash.Lock() + defer tiflash.Unlock() + if canAvailable { + if z, ok := tiflash.SyncStatus[tableID]; ok { + z.Regions = []int{1} + tiflash.SyncStatus[tableID] = z + } else { + tiflash.SyncStatus[tableID] = mockTiFlashTableInfo{ + Regions: []int{1}, + Accel: false, + } + } + } else { + delete(tiflash.SyncStatus, tableID) + } +} + +// HandleDeletePlacementRule is mock function for DeleteTiFlashPlacementRule. +func (tiflash *MockTiFlash) HandleDeletePlacementRule(group string, ruleID string) { + tiflash.Lock() + defer tiflash.Unlock() + delete(tiflash.GlobalTiFlashPlacementRules, ruleID) +} + +// HandleGetGroupRules is mock function for GetTiFlashGroupRules. +func (tiflash *MockTiFlash) HandleGetGroupRules(group string) ([]placement.TiFlashRule, error) { + tiflash.Lock() + defer tiflash.Unlock() + var result = make([]placement.TiFlashRule, 0) + for _, item := range tiflash.GlobalTiFlashPlacementRules { + result = append(result, item) + } + return result, nil +} + +// HandlePostAccelerateSchedule is mock function for PostAccelerateSchedule +func (tiflash *MockTiFlash) HandlePostAccelerateSchedule(endKey string) error { + tiflash.Lock() + defer tiflash.Unlock() + tableID := helper.GetTiFlashTableIDFromEndKey(endKey) + + table, ok := tiflash.SyncStatus[int(tableID)] + if ok { + table.Accel = true + tiflash.SyncStatus[int(tableID)] = table + } else { + tiflash.SyncStatus[int(tableID)] = mockTiFlashTableInfo{ + Regions: []int{}, + Accel: true, + } + } + return nil +} + +// HandleGetPDRegionRecordStats is mock function for GetRegionCountFromPD. +// It currently always returns 1 Region for convenience. +func (tiflash *MockTiFlash) HandleGetPDRegionRecordStats(_ int64) helper.PDRegionStats { + return helper.PDRegionStats{ + Count: 1, + } +} + +// AddStore is mock function for adding store info into MockTiFlash. +func (tiflash *MockTiFlash) AddStore(storeID uint64, address string) { + tiflash.StoreInfo[storeID] = helper.StoreBaseStat{ + ID: int64(storeID), + Address: address, + State: 0, + StateName: "Up", + Version: "4.0.0-alpha", + StatusAddress: tiflash.StatusAddr, + GitHash: "mock-tikv-githash", + StartTimestamp: tiflash.StartTime.Unix(), + Labels: []helper.StoreLabel{{ + Key: "engine", + Value: "tiflash", + }}, + } +} + +// HandleGetStoresStat is mock function for GetStoresStat. +// It returns address of our mocked TiFlash server. +func (tiflash *MockTiFlash) HandleGetStoresStat() *helper.StoresStat { + tiflash.Lock() + defer tiflash.Unlock() + if len(tiflash.StoreInfo) == 0 { + // default Store + return &helper.StoresStat{ + Count: 1, + Stores: []helper.StoreStat{ + { + Store: helper.StoreBaseStat{ + ID: 1, + Address: "127.0.0.1:3930", + State: 0, + StateName: "Up", + Version: "4.0.0-alpha", + StatusAddress: tiflash.StatusAddr, + GitHash: "mock-tikv-githash", + StartTimestamp: tiflash.StartTime.Unix(), + Labels: []helper.StoreLabel{{ + Key: "engine", + Value: "tiflash", + }}, + }, + }, + }, + } + } + stores := make([]helper.StoreStat, 0, len(tiflash.StoreInfo)) + for _, storeInfo := range tiflash.StoreInfo { + stores = append(stores, helper.StoreStat{Store: storeInfo, Status: helper.StoreDetailStat{}}) + } + return &helper.StoresStat{ + Count: len(tiflash.StoreInfo), + Stores: stores, + } +} + +// SetRuleGroupIndex sets the group index of tiflash +func (tiflash *MockTiFlash) SetRuleGroupIndex(groupIndex int) { + tiflash.Lock() + defer tiflash.Unlock() + tiflash.groupIndex = groupIndex +} + +// GetRuleGroupIndex gets the group index of tiflash +func (tiflash *MockTiFlash) GetRuleGroupIndex() int { + tiflash.Lock() + defer tiflash.Unlock() + return tiflash.groupIndex +} + +// Compare supposed rule, and we actually get from TableInfo +func isRuleMatch(rule placement.TiFlashRule, startKey string, endKey string, count int, labels []string) bool { + // Compute startKey + if rule.StartKeyHex == startKey && rule.EndKeyHex == endKey { + ok := false + for _, c := range rule.Constraints { + if c.Key == "engine" && len(c.Values) == 1 && c.Values[0] == "tiflash" && c.Op == placement.In { + ok = true + break + } + } + if !ok { + return false + } + + if len(rule.LocationLabels) == len(labels) { + for i, lb := range labels { + if lb != rule.LocationLabels[i] { + return false + } + } + } else { + return false + } + + if rule.Count != count { + return false + } + if rule.Role != placement.Learner { + return false + } + } else { + return false + } + return true +} + +// CheckPlacementRule find if a given rule precisely matches already set rules. +func (tiflash *MockTiFlash) CheckPlacementRule(rule placement.TiFlashRule) bool { + tiflash.Lock() + defer tiflash.Unlock() + for _, r := range tiflash.GlobalTiFlashPlacementRules { + if isRuleMatch(rule, r.StartKeyHex, r.EndKeyHex, r.Count, r.LocationLabels) { + return true + } + } + return false +} + +// GetPlacementRule find a rule by name. +func (tiflash *MockTiFlash) GetPlacementRule(ruleName string) (*placement.TiFlashRule, bool) { + tiflash.Lock() + defer tiflash.Unlock() + if r, ok := tiflash.GlobalTiFlashPlacementRules[ruleName]; ok { + p := r + return &p, ok + } + return nil, false +} + +// CleanPlacementRules cleans all placement rules. +func (tiflash *MockTiFlash) CleanPlacementRules() { + tiflash.Lock() + defer tiflash.Unlock() + tiflash.GlobalTiFlashPlacementRules = make(map[string]placement.TiFlashRule) +} + +// PlacementRulesLen gets length of all currently set placement rules. +func (tiflash *MockTiFlash) PlacementRulesLen() int { + tiflash.Lock() + defer tiflash.Unlock() + return len(tiflash.GlobalTiFlashPlacementRules) +} + +// GetTableSyncStatus returns table sync status by given tableID. +func (tiflash *MockTiFlash) GetTableSyncStatus(tableID int) (*mockTiFlashTableInfo, bool) { + tiflash.Lock() + defer tiflash.Unlock() + if r, ok := tiflash.SyncStatus[tableID]; ok { + p := r + return &p, ok + } + return nil, false +} + +// PdSwitch controls if pd is enabled. +func (tiflash *MockTiFlash) PdSwitch(enabled bool) { + tiflash.Lock() + defer tiflash.Unlock() + tiflash.PdEnabled = enabled +} + +// CalculateTiFlashProgress return truncated string to avoid float64 comparison. +func (m *mockTiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) { + return calculateTiFlashProgress(tableID, replicaCount, tiFlashStores) +} + +// UpdateTiFlashProgressCache updates tiflashProgressCache +func (m *mockTiFlashReplicaManagerCtx) UpdateTiFlashProgressCache(tableID int64, progress float64) { + m.Lock() + defer m.Unlock() + m.tiflashProgressCache[tableID] = progress +} + +// GetTiFlashProgressFromCache gets tiflash replica progress from tiflashProgressCache +func (m *mockTiFlashReplicaManagerCtx) GetTiFlashProgressFromCache(tableID int64) (float64, bool) { + m.RLock() + defer m.RUnlock() + progress, ok := m.tiflashProgressCache[tableID] + return progress, ok +} + +// DeleteTiFlashProgressFromCache delete tiflash replica progress from tiflashProgressCache +func (m *mockTiFlashReplicaManagerCtx) DeleteTiFlashProgressFromCache(tableID int64) { + m.Lock() + defer m.Unlock() + delete(m.tiflashProgressCache, tableID) +} + +// CleanTiFlashProgressCache clean progress cache +func (m *mockTiFlashReplicaManagerCtx) CleanTiFlashProgressCache() { + m.Lock() + defer m.Unlock() + m.tiflashProgressCache = make(map[int64]float64) +} + +// SetMockTiFlash is set a mock TiFlash server. +func (m *mockTiFlashReplicaManagerCtx) SetMockTiFlash(tiflash *MockTiFlash) { + m.Lock() + defer m.Unlock() + m.tiflash = tiflash +} + +// SetTiFlashGroupConfig sets the tiflash's rule group config +func (m *mockTiFlashReplicaManagerCtx) SetTiFlashGroupConfig(_ context.Context) error { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil + } + m.tiflash.SetRuleGroupIndex(placement.RuleIndexTiFlash) + return nil +} + +// SetPlacementRule is a helper function to set placement rule. +func (m *mockTiFlashReplicaManagerCtx) SetPlacementRule(ctx context.Context, rule placement.TiFlashRule) error { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil + } + return m.tiflash.HandleSetPlacementRule(rule) +} + +// DeletePlacementRule is to delete placement rule for certain group. +func (m *mockTiFlashReplicaManagerCtx) DeletePlacementRule(ctx context.Context, group string, ruleID string) error { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil + } + logutil.BgLogger().Info("Remove TiFlash rule", zap.String("ruleID", ruleID)) + m.tiflash.HandleDeletePlacementRule(group, ruleID) + return nil +} + +// GetGroupRules to get all placement rule in a certain group. +func (m *mockTiFlashReplicaManagerCtx) GetGroupRules(ctx context.Context, group string) ([]placement.TiFlashRule, error) { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return []placement.TiFlashRule{}, nil + } + return m.tiflash.HandleGetGroupRules(group) +} + +// PostAccelerateSchedule sends `regions/accelerate-schedule` request. +func (m *mockTiFlashReplicaManagerCtx) PostAccelerateSchedule(ctx context.Context, tableID int64) error { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil + } + endKey := tablecodec.EncodeTablePrefix(tableID + 1) + endKey = codec.EncodeBytes([]byte{}, endKey) + return m.tiflash.HandlePostAccelerateSchedule(hex.EncodeToString(endKey)) +} + +// GetRegionCountFromPD is a helper function calling `/stats/region`. +func (m *mockTiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil + } + stats := m.tiflash.HandleGetPDRegionRecordStats(tableID) + *regionCount = stats.Count + return nil +} + +// GetStoresStat gets the TiKV store information by accessing PD's api. +func (m *mockTiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*helper.StoresStat, error) { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return nil, &MockTiFlashError{"MockTiFlash is not accessible"} + } + return m.tiflash.HandleGetStoresStat(), nil +} + +// Close is called to close mockTiFlashReplicaManager. +func (m *mockTiFlashReplicaManagerCtx) Close(ctx context.Context) { + m.Lock() + defer m.Unlock() + if m.tiflash == nil { + return + } + if m.tiflash.StatusServer != nil { + m.tiflash.StatusServer.Close() + } +} + +// MockTiFlashError represents MockTiFlash error +type MockTiFlashError struct { + Message string +} + +func (me *MockTiFlashError) Error() string { + return me.Message +}