Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workloadrepo: add admin create workload snapshot stmt #58250

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ go_library(
"update.go",
"utils.go",
"window.go",
"workloadrepo.go",
"write.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor",
Expand Down
7 changes: 7 additions & 0 deletions pkg/executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ func (b *executorBuilder) build(p base.Plan) exec.Executor {
return b.buildExpand(v)
case *plannercore.RecommendIndexPlan:
return b.buildRecommendIndex(v)
case *plannercore.WorkloadRepoCreate:
return b.buildWorkloadRepoCreate(v)
default:
if mp, ok := p.(testutil.MockPhysicalPlan); ok {
return mp.GetExecutor()
Expand Down Expand Up @@ -5760,3 +5762,8 @@ func (b *executorBuilder) buildRecommendIndex(v *plannercore.RecommendIndexPlan)
Options: v.Options,
}
}

func (b *executorBuilder) buildWorkloadRepoCreate(_ *plannercore.WorkloadRepoCreate) exec.Executor {
base := exec.NewBaseExecutor(b.ctx, nil, 0)
return &WorkloadRepoCreateExec{base}
}
38 changes: 38 additions & 0 deletions pkg/executor/workloadrepo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"

"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/util/chunk"
)

// TakeSnapshot is a hook from workload repo that may trigger manual snapshot.
var TakeSnapshot func() error

// WorkloadRepoCreateExec indicates WorkloadRepoCreate executor.
type WorkloadRepoCreateExec struct {
exec.BaseExecutor
}

// Next implements the Executor Next interface.
func (*WorkloadRepoCreateExec) Next(context.Context, *chunk.Chunk) error {
if TakeSnapshot != nil {
return TakeSnapshot()
}
return nil
}
3 changes: 3 additions & 0 deletions pkg/parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2577,6 +2577,7 @@ const (
AdminShowBDRRole
AdminUnsetBDRRole
AdminAlterDDLJob
AdminWorkloadRepoCreate
)

// HandleRange represents a range where handle value >= Begin and < End.
Expand Down Expand Up @@ -2877,6 +2878,8 @@ func (n *AdminStmt) Restore(ctx *format.RestoreCtx) error {
return errors.Annotatef(err, "An error occurred while restore AdminStmt.AlterJobOptions[%d]", i)
}
}
case AdminWorkloadRepoCreate:
ctx.WriteKeyWord("CREATE WORKLOAD SNAPSHOT")
default:
return errors.New("Unsupported AdminStmt type")
}
Expand Down
9,060 changes: 4,534 additions & 4,526 deletions pkg/parser/parser.go

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions pkg/parser/parser.y
Original file line number Diff line number Diff line change
Expand Up @@ -11239,6 +11239,10 @@ AdminStmt:
Index: string($5),
}
}
| "ADMIN" "CREATE" "WORKLOAD" "SNAPSHOT"
{
$$ = &ast.AdminStmt{Tp: ast.AdminWorkloadRepoCreate}
}
| "ADMIN" "CLEANUP" "INDEX" TableName Identifier
{
$$ = &ast.AdminStmt{
Expand Down
1 change: 1 addition & 0 deletions pkg/parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ func TestAdminStmt(t *testing.T) {
{"admin show ddl job queries limit 3 offset 2", true, "ADMIN SHOW DDL JOB QUERIES LIMIT 2, 3"},
{"admin show ddl job queries limit 22 offset 0", true, "ADMIN SHOW DDL JOB QUERIES LIMIT 0, 22"},
{"admin show t1 next_row_id", true, "ADMIN SHOW `t1` NEXT_ROW_ID"},
{"admin create workload snapshot;", true, "ADMIN CREATE WORKLOAD SNAPSHOT"},
{"admin check table t1, t2;", true, "ADMIN CHECK TABLE `t1`, `t2`"},
{"admin check index tableName idxName;", true, "ADMIN CHECK INDEX `tableName` idxName"},
{"admin check index tableName idxName (1, 2), (4, 5);", true, "ADMIN CHECK INDEX `tableName` idxName (1,2), (4,5)"},
Expand Down
5 changes: 5 additions & 0 deletions pkg/planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ type AlterDDLJob struct {
Options []*AlterDDLJobOpt
}

// WorkloadRepoCreate is the plan of admin create workload snapshot.
type WorkloadRepoCreate struct {
baseSchemaProducer
}

// ReloadExprPushdownBlacklist reloads the data from expr_pushdown_blacklist table.
type ReloadExprPushdownBlacklist struct {
baseSchemaProducer
Expand Down
2 changes: 2 additions & 0 deletions pkg/planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,6 +1584,8 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (base.P
if err != nil {
return nil, err
}
case ast.AdminWorkloadRepoCreate:
return &WorkloadRepoCreate{}, nil
default:
return nil, plannererrors.ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/util/workloadrepo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
deps = [
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/executor",
"//pkg/infoschema",
"//pkg/kv",
"//pkg/meta/model",
Expand Down
12 changes: 8 additions & 4 deletions pkg/util/workloadrepo/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
)

const (
ownerKey = "/tidb/workloadrepo/owner"
promptKey = "workloadrepo"
snapIDKey = "/tidb/workloadrepo/snap_id"
ownerKey = "/tidb/workloadrepo/owner"
promptKey = "workloadrepo"
snapIDKey = "/tidb/workloadrepo/snap_id"
snapCommandKey = "/tidb/workloadrepo/snap_command"

etcdOpTimeout = 5 * time.Second
snapCommandTake = "take_snapshot"

etcdOpTimeout = 5 * time.Second
snapshotRetries = 5

defSamplingInterval = 5
defSnapshotInterval = 3600
Expand Down
92 changes: 62 additions & 30 deletions pkg/util/workloadrepo/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (w *worker) etcdCreate(ctx context.Context, key, val string) error {
defer cancel()
res, err := w.etcdClient.Txn(ctx).
If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)).
Then(clientv3.OpPut(snapIDKey, val)).
Then(clientv3.OpPut(key, val)).
Commit()
if err != nil {
return err
Expand Down Expand Up @@ -128,6 +128,53 @@ func (w *worker) snapshotTable(ctx context.Context, snapID uint64, rt *repositor
return nil
}

func (w *worker) takeSnapshot(ctx context.Context, sess sessionctx.Context, sendCommand bool) {
// coordination logic
if !w.owner.IsOwner() {
if sendCommand {
command, err := w.etcdGet(ctx, snapCommandKey, "")
if err != nil {
logutil.BgLogger().Info("workload repository cannot get current snapid to send", zap.NamedError("err", err))
return
}

if err = w.etcdCAS(ctx, snapCommandKey, command, snapCommandTake); err != nil {
logutil.BgLogger().Info("workload repository cannot send snapshot command", zap.NamedError("err", err))
return
}
}
return
}

for range snapshotRetries {
snapID, err := w.getSnapID(ctx)
if err != nil {
logutil.BgLogger().Info("workload repository cannot get current snapid", zap.NamedError("err", err))
continue
}
// Use UPSERT to ensure this SQL doesn't fail on duplicate snapID.
//
// NOTE: In a highly unlikely corner case, there could be two owners.
// This might occur if upsertHistSnapshot succeeds but updateSnapID fails
// due to another owner winning the etcd CAS loop.
// While undesirable, this scenario is acceptable since both owners would
// likely share similar datetime values and same cluster version.

if err := upsertHistSnapshot(ctx, sess, snapID+1); err != nil {
logutil.BgLogger().Info("workload repository could not insert into hist_snapshots", zap.NamedError("err", err))
continue
}
err = w.updateSnapID(ctx, snapID, snapID+1)
if err != nil {
logutil.BgLogger().Info("workload repository cannot update current snapid", zap.Uint64("new_id", snapID), zap.NamedError("err", err))
continue
}

logutil.BgLogger().Info("workload repository fired snapshot", zap.String("owner", w.instanceID), zap.Uint64("snapID", snapID+1))
break
}
}

func (w *worker) startSnapshot(_ctx context.Context) func() {
return func() {
w.Lock()
Expand All @@ -142,44 +189,25 @@ func (w *worker) startSnapshot(_ctx context.Context) func() {
// other wise wch won't be collected after the exit of this function
ctx, cancel := context.WithCancel(_ctx)
defer cancel()
wch := w.etcdClient.Watch(ctx, snapIDKey)
snapIDCh := w.etcdClient.Watch(ctx, snapIDKey)
snapCmdCh := w.etcdClient.Watch(ctx, snapCommandKey)

for {
select {
case <-ctx.Done():
return
case <-w.snapshotTicker.C:
// coordination logic
if !w.owner.IsOwner() {
case resp := <-snapCmdCh:
if len(resp.Events) < 1 {
continue
}

for range 5 {
snapID, err := w.getSnapID(ctx)
if err != nil {
logutil.BgLogger().Info("workload repository cannot get current snapid", zap.NamedError("err", err))
continue
}
// use upsert such that this SQL does not fail on duplicated snapID
//
// NOTE: in an almost impossible corner case, there may be two owners.
// maybe upsertHistSnapshot succeed and updateSnapID fail, because
// another owner won the etcd CAS loop.
// it is unwanted but acceptable. because two owners should share
// similar datetime, same cluster versions.
if err := upsertHistSnapshot(ctx, sess, snapID+1); err != nil {
logutil.BgLogger().Info("workload repository could not insert into hist_snapshots", zap.NamedError("err", err))
continue
}
err = w.updateSnapID(ctx, snapID, snapID+1)
if err != nil {
logutil.BgLogger().Info("workload repository cannot update current snapid", zap.Uint64("new_id", snapID), zap.NamedError("err", err))
continue
}
logutil.BgLogger().Info("workload repository fired snapshot", zap.String("owner", w.instanceID), zap.Uint64("snapID", snapID+1))
break
// same as snapID events
// we only catch the last event if possible
snapCommandStr := string(resp.Events[len(resp.Events)-1].Kv.Value)
if snapCommandStr == snapCommandTake {
w.takeSnapshot(ctx, sess, false)
}
case resp := <-wch:
case resp := <-snapIDCh:
if len(resp.Events) < 1 {
// since there is no event, we don't know the latest snapid either
// really should not happen except creation
Expand Down Expand Up @@ -218,6 +246,10 @@ func (w *worker) startSnapshot(_ctx context.Context) func() {
if err := updateHistSnapshot(ctx, sess, snapID, errs); err != nil {
logutil.BgLogger().Info("workload repository snapshot failed: could not update hist_snapshots", zap.NamedError("err", err))
}
case <-w.snapshotChan:
w.takeSnapshot(ctx, sess, true)
case <-w.snapshotTicker.C:
w.takeSnapshot(ctx, sess, false)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/util/workloadrepo/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (w *worker) createAllTables(ctx context.Context) error {
return createAllPartitions(ctx, sess, is)
}

// checkTablesExists will check if all tables are created and if the work is bootstrapped.
func (w *worker) checkTablesExists(ctx context.Context) bool {
_sessctx := w.getSessionWithRetry()
sess := _sessctx.(sessionctx.Context)
Expand Down
16 changes: 16 additions & 0 deletions pkg/util/workloadrepo/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/sessionctx"
Expand Down Expand Up @@ -118,6 +119,7 @@ type worker struct {
samplingTicker *time.Ticker
snapshotInterval int32
snapshotTicker *time.Ticker
snapshotChan chan struct{}
retentionDays int32
}

Expand All @@ -127,7 +129,17 @@ var workerCtx = worker{
retentionDays: defRententionDays,
}

func takeSnapshot() error {
if workerCtx.snapshotChan == nil {
return errors.New("Workload repository is not enabled yet")
}
workerCtx.snapshotChan <- struct{}{}
return nil
}

func init() {
executor.TakeSnapshot = takeSnapshot

variable.RegisterSysVar(&variable.SysVar{
Scope: variable.ScopeGlobal,
Name: repositoryDest,
Expand Down Expand Up @@ -336,6 +348,7 @@ func (w *worker) start() error {
}

_ = stmtsummary.StmtSummaryByDigestMap.SetHistoryEnabled(false)
w.snapshotChan = make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
w.cancel = cancel
w.wg.RunWithRecover(w.startRepository(ctx), func(err any) {
Expand All @@ -344,6 +357,7 @@ func (w *worker) start() error {
return nil
}

// stop will stop the worker.
func (w *worker) stop() {
w.enabled = false

Expand All @@ -362,8 +376,10 @@ func (w *worker) stop() {
}

w.cancel = nil
w.snapshotChan = nil
}

// setRepositoryDest will change the dest of workload snapshot.
func (w *worker) setRepositoryDest(_ context.Context, dst string) error {
w.Lock()
defer w.Unlock()
Expand Down
Loading