Skip to content

Commit

Permalink
Merge branch 'master' into feature/dropping-multi-indexes
Browse files Browse the repository at this point in the history
  • Loading branch information
ou-bing authored Sep 1, 2021
2 parents 0c1bcd1 + b0c9d19 commit e2b6933
Show file tree
Hide file tree
Showing 136 changed files with 6,084 additions and 2,167 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/bug-closed.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Bug Closed

on:
issues:
types:
- closed

jobs:
label_issues:
if: |
contains(github.event.issue.labels.*.name, 'type/bug') &&
!(contains(join(github.event.issue.labels.*.name, ', '), 'affects-') &&
contains(join(github.event.issue.labels.*.name, ', '), 'backport-'))
runs-on: ubuntu-latest
permissions:
issues: write
steps:
- name: Label issues
uses: andymckay/[email protected]
with:
add-labels: "needs-more-info"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Add comment
uses: peter-evans/[email protected]
with:
issue-number: ${{ github.event.issue.number }}
body: |
Please check whether the issue should be labeled with 'affects-x.y' or 'backport-x.y.z',
and then remove 'needs-more-info' label.
20 changes: 0 additions & 20 deletions .github/workflows/issue_assigned.yml

This file was deleted.

15 changes: 8 additions & 7 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,6 @@ func BuildBackupRangeAndSchema(
continue
}

idAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.RowIDAllocType)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.SequenceType)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, false, autoid.AutoRandomType)

tables, err := m.ListTables(dbInfo.ID)
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -294,14 +290,19 @@ func BuildBackupRangeAndSchema(
zap.String("table", tableInfo.Name.O),
)

tblVer := autoid.AllocOptionTableInfoVersion(tableInfo.Version)
idAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.RowIDAllocType, tblVer)
seqAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.SequenceType, tblVer)
randAlloc := autoid.NewAllocator(storage, dbInfo.ID, tableInfo.ID, false, autoid.AutoRandomType, tblVer)

var globalAutoID int64
switch {
case tableInfo.IsSequence():
globalAutoID, err = seqAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoID, err = seqAlloc.NextGlobalAutoID()
case tableInfo.IsView() || !utils.NeedAutoID(tableInfo):
// no auto ID for views or table without either rowID nor auto_increment ID.
default:
globalAutoID, err = idAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoID, err = idAlloc.NextGlobalAutoID()
}
if err != nil {
return nil, nil, errors.Trace(err)
Expand All @@ -311,7 +312,7 @@ func BuildBackupRangeAndSchema(
if tableInfo.PKIsHandle && tableInfo.ContainsAutoRandomBits() {
// this table has auto_random id, we need backup and rebase in restoration
var globalAutoRandID int64
globalAutoRandID, err = randAlloc.NextGlobalAutoID(tableInfo.ID)
globalAutoRandID, err = randAlloc.NextGlobalAutoID()
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down
56 changes: 39 additions & 17 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package conn
import (
"context"
"crypto/tls"
"fmt"
"os"
"sync"
"time"
Expand Down Expand Up @@ -162,6 +163,27 @@ func GetAllTiKVStores(
return stores[:j], nil
}

func checkStoresAlive(ctx context.Context,
pdclient pd.Client,
storeBehavior StoreBehavior) error {
// Check live tikv.
stores, err := GetAllTiKVStores(ctx, pdclient, storeBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return errors.Trace(err)
}

liveStoreCount := 0
for _, s := range stores {
if s.GetState() != metapb.StoreState_Up {
continue
}
liveStoreCount++
}
log.Info("checked alive KV stores", zap.Int("aliveStores", liveStoreCount), zap.Int("totalStores", len(stores)))
return nil
}

// NewMgr creates a new Mgr.
//
// Domain is optional for Backup, set `needDomain` to false to disable
Expand All @@ -170,7 +192,6 @@ func NewMgr(
ctx context.Context,
g glue.Glue,
pdAddrs string,
storage kv.Storage,
tlsConf *tls.Config,
securityOption pd.SecurityOption,
keepalive keepalive.ClientParameters,
Expand All @@ -184,10 +205,7 @@ func NewMgr(
ctx = opentracing.ContextWithSpan(ctx, span1)
}

tikvStorage, ok := storage.(tikv.Storage)
if !ok {
return nil, berrors.ErrKVNotTiKV
}
log.Info("new mgr", zap.String("pdAddrs", pdAddrs))

controller, err := pdutil.NewPdController(ctx, pdAddrs, tlsConf, securityOption)
if err != nil {
Expand All @@ -201,20 +219,21 @@ func NewMgr(
"if you believe it's OK, use --check-requirements=false to skip.")
}
}
log.Info("new mgr", zap.String("pdAddrs", pdAddrs))

// Check live tikv.
stores, err := GetAllTiKVStores(ctx, controller.GetPDClient(), storeBehavior)
err = checkStoresAlive(ctx, controller.GetPDClient(), storeBehavior)
if err != nil {
log.Error("fail to get store", zap.Error(err))
return nil, errors.Trace(err)
}
liveStoreCount := 0
for _, s := range stores {
if s.GetState() != metapb.StoreState_Up {
continue
}
liveStoreCount++

// Disable GC because TiDB enables GC already.
storage, err := g.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddrs), securityOption)
if err != nil {
return nil, errors.Trace(err)
}

tikvStorage, ok := storage.(tikv.Storage)
if !ok {
return nil, berrors.ErrKVNotTiKV
}

var dom *domain.Domain
Expand All @@ -232,9 +251,12 @@ func NewMgr(
dom: dom,
tlsConf: tlsConf,
ownsStorage: g.OwnsStorage(),
grpcClis: struct {
mu sync.Mutex
clis map[uint64]*grpc.ClientConn
}{clis: make(map[uint64]*grpc.ClientConn)},
keepalive: keepalive,
}
mgr.grpcClis.clis = make(map[uint64]*grpc.ClientConn)
mgr.keepalive = keepalive
return mgr, nil
}

Expand Down
57 changes: 57 additions & 0 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,63 @@ func (fpdc fakePDClient) GetAllStores(context.Context, ...pd.GetStoreOption) ([]
return append([]*metapb.Store{}, fpdc.stores...), nil
}

func (s *testClientSuite) TestCheckStoresAlive(c *C) {
stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 2,
State: metapb.StoreState_Offline,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tiflash",
},
},
},
{
Id: 3,
State: metapb.StoreState_Up,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
{
Id: 4,
State: metapb.StoreState_Offline,
Labels: []*metapb.StoreLabel{
{
Key: "engine",
Value: "tikv",
},
},
},
}

fpdc := fakePDClient{
stores: stores,
}

kvStores, err := GetAllTiKVStores(s.ctx, fpdc, SkipTiFlash)
c.Assert(err, IsNil)
c.Assert(len(kvStores), Equals, 2)
c.Assert(kvStores, DeepEquals, stores[2:])

err = checkStoresAlive(s.ctx, fpdc, SkipTiFlash)
c.Assert(err, IsNil)
}

func (s *testClientSuite) TestGetAllTiKVStores(c *C) {
testCases := []struct {
stores []*metapb.Store
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewPanickingAllocators(base int64) autoid.Allocators {
}

// Rebase implements the autoid.Allocator interface
func (alloc *panickingAllocator) Rebase(tableID, newBase int64, allocIDs bool) error {
func (alloc *panickingAllocator) Rebase(newBase int64, allocIDs bool) error {
// CAS
for {
oldBase := atomic.LoadInt64(alloc.base)
Expand Down
8 changes: 1 addition & 7 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,9 +1272,7 @@ func (local *local) allocateTSIfNotExists(ctx context.Context, engine *File) err
return engine.saveEngineMeta()
}

// CloseEngine closes backend engine by uuid
// NOTE: we will return nil if engine is not exist. This will happen if engine import&cleanup successfully
// but exit before update checkpoint. Thus after restart, we will try to import this engine again.
// CloseEngine closes backend engine by uuid.
func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig, engineUUID uuid.UUID) error {
// flush mem table to storage, to free memory,
// ask others' advise, looks like unnecessary, but with this we can control memory precisely.
Expand All @@ -1283,10 +1281,6 @@ func (local *local) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
// recovery mode, we should reopen this engine file
db, err := local.openEngineDB(engineUUID, true)
if err != nil {
// if engine db does not exist, just skip
if os.IsNotExist(errors.Cause(err)) {
return nil
}
return err
}
engineFile := &File{
Expand Down
19 changes: 8 additions & 11 deletions br/pkg/lightning/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ type DB interface {
// It assumes the entire table has not been imported before and will fill in
// default values for the column permutations and checksums.
InsertEngineCheckpoints(ctx context.Context, tableName string, checkpoints map[int32]*EngineCheckpoint) error
Update(checkpointDiffs map[string]*TableCheckpointDiff)
Update(checkpointDiffs map[string]*TableCheckpointDiff) error

RemoveCheckpoint(ctx context.Context, tableName string) error
// MoveCheckpoints renames the checkpoint schema to include a suffix
Expand Down Expand Up @@ -599,7 +599,9 @@ func (*NullCheckpointsDB) InsertEngineCheckpoints(_ context.Context, _ string, _
return nil
}

func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff) {}
func (*NullCheckpointsDB) Update(map[string]*TableCheckpointDiff) error {
return nil
}

type MySQLCheckpointsDB struct {
db *sql.DB
Expand Down Expand Up @@ -859,15 +861,15 @@ func (cpdb *MySQLCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tab
return nil
}

func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) {
func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) error {
chunkQuery := fmt.Sprintf(UpdateChunkTemplate, cpdb.schema, CheckpointTableNameChunk)
rebaseQuery := fmt.Sprintf(UpdateTableRebaseTemplate, cpdb.schema, CheckpointTableNameTable)
tableStatusQuery := fmt.Sprintf(UpdateTableStatusTemplate, cpdb.schema, CheckpointTableNameTable)
tableChecksumQuery := fmt.Sprintf(UpdateTableChecksumTemplate, cpdb.schema, CheckpointTableNameTable)
engineStatusQuery := fmt.Sprintf(UpdateEngineTemplate, cpdb.schema, CheckpointTableNameEngine)

s := common.SQLWithRetry{DB: cpdb.db, Logger: log.L()}
err := s.Transact(context.Background(), "update checkpoints", func(c context.Context, tx *sql.Tx) error {
return s.Transact(context.Background(), "update checkpoints", func(c context.Context, tx *sql.Tx) error {
chunkStmt, e := tx.PrepareContext(c, chunkQuery)
if e != nil {
return errors.Trace(e)
Expand Down Expand Up @@ -933,9 +935,6 @@ func (cpdb *MySQLCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoi

return nil
})
if err != nil {
log.L().Error("save checkpoint failed", zap.Error(err))
}
}

type FileCheckpointsDB struct {
Expand Down Expand Up @@ -1165,7 +1164,7 @@ func (cpdb *FileCheckpointsDB) InsertEngineCheckpoints(_ context.Context, tableN
return errors.Trace(cpdb.save())
}

func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) {
func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) error {
cpdb.lock.Lock()
defer cpdb.lock.Unlock()

Expand Down Expand Up @@ -1200,9 +1199,7 @@ func (cpdb *FileCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpoin
}
}

if err := cpdb.save(); err != nil {
log.L().Error("save checkpoint failed", zap.Error(err))
}
return cpdb.save()
}

// Management functions ----------------------------------------------------------------------------
Expand Down
9 changes: 3 additions & 6 deletions br/pkg/lightning/checkpoints/glue_checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,20 +411,20 @@ func (g GlueCheckpointsDB) InsertEngineCheckpoints(ctx context.Context, tableNam
return errors.Trace(err)
}

func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) {
func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDiff) error {
logger := log.L()
se, err := g.getSessionFunc()
if err != nil {
log.L().Error("can't get a session to update GlueCheckpointsDB", zap.Error(errors.Trace(err)))
return
return err
}
defer se.Close()

chunkQuery := fmt.Sprintf(UpdateChunkTemplate, g.schema, CheckpointTableNameChunk)
rebaseQuery := fmt.Sprintf(UpdateTableRebaseTemplate, g.schema, CheckpointTableNameTable)
tableStatusQuery := fmt.Sprintf(UpdateTableStatusTemplate, g.schema, CheckpointTableNameTable)
engineStatusQuery := fmt.Sprintf(UpdateEngineTemplate, g.schema, CheckpointTableNameEngine)
err = Transact(context.Background(), "update checkpoints", se, logger, func(c context.Context, s Session) error {
return Transact(context.Background(), "update checkpoints", se, logger, func(c context.Context, s Session) error {
chunkStmt, _, _, err := s.PrepareStmt(chunkQuery)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -501,9 +501,6 @@ func (g GlueCheckpointsDB) Update(checkpointDiffs map[string]*TableCheckpointDif
}
return nil
})
if err != nil {
log.L().Error("save checkpoint failed", zap.Error(err))
}
}

func (g GlueCheckpointsDB) RemoveCheckpoint(ctx context.Context, tableName string) error {
Expand Down
Loading

0 comments on commit e2b6933

Please sign in to comment.