From ff794e5ce0352d2a6c2604a05db41e1c469e6e10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Wed, 6 Mar 2024 18:39:35 +0800 Subject: [PATCH] operator: print callstack when exiting before preparing done (#51371) close pingcap/tidb#51370 --- br/cmd/br/main.go | 26 +----------- br/pkg/backup/prepare_snap/prepare.go | 3 +- br/pkg/task/operator/cmd.go | 2 + br/pkg/utils/misc.go | 57 +++++++++++++++++++++++++++ 4 files changed, 63 insertions(+), 25 deletions(-) diff --git a/br/cmd/br/main.go b/br/cmd/br/main.go index 5eca340f1e622..f745920f5bfba 100644 --- a/br/cmd/br/main.go +++ b/br/cmd/br/main.go @@ -2,39 +2,17 @@ package main import ( "context" - "fmt" "os" - "os/signal" - "syscall" "github.com/pingcap/log" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/spf13/cobra" "go.uber.org/zap" ) func main() { gCtx := context.Background() - ctx, cancel := context.WithCancel(gCtx) - defer cancel() - - sc := make(chan os.Signal, 1) - signal.Notify(sc, - syscall.SIGHUP, - syscall.SIGINT, - syscall.SIGTERM, - syscall.SIGQUIT) - - go func() { - sig := <-sc - fmt.Printf("\nGot signal [%v] to exit.\n", sig) - log.Warn("received signal to exit", zap.Stringer("signal", sig)) - cancel() - fmt.Fprintln(os.Stderr, "gracefully shuting down, press ^C again to force exit") - <-sc - // Even user use SIGTERM to exit, there isn't any checkpoint for resuming, - // hence returning fail exit code. - os.Exit(1) - }() + ctx, cancel := utils.StartExitSingleListener(gCtx) rootCmd := &cobra.Command{ Use: "br", diff --git a/br/pkg/backup/prepare_snap/prepare.go b/br/pkg/backup/prepare_snap/prepare.go index 46f1916873831..f3ccdff2b1163 100644 --- a/br/pkg/backup/prepare_snap/prepare.go +++ b/br/pkg/backup/prepare_snap/prepare.go @@ -370,7 +370,9 @@ func (p *Preparer) workOnPendingRanges(ctx context.Context) error { } func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) error { + logutil.CL(ctx).Info("about to send wait apply to stores", zap.Int("to-stores", len(reqs))) for store, req := range reqs { + logutil.CL(ctx).Info("sending wait apply requests to store", zap.Uint64("store", store), zap.Int("regions", len(req.Regions))) stream, err := p.streamOf(ctx, store) if err != nil { return errors.Annotatef(err, "failed to dial the store %d", store) @@ -379,7 +381,6 @@ func (p *Preparer) sendWaitApply(ctx context.Context, reqs pendingRequests) erro if err != nil { return errors.Annotatef(err, "failed to send message to the store %d", store) } - logutil.CL(ctx).Info("sent wait apply requests to store", zap.Uint64("store", store), zap.Int("regions", len(req.Regions))) } return nil } diff --git a/br/pkg/task/operator/cmd.go b/br/pkg/task/operator/cmd.go index 40048721f4868..2b0157a699036 100644 --- a/br/pkg/task/operator/cmd.go +++ b/br/pkg/task/operator/cmd.go @@ -106,6 +106,7 @@ func hintAllReady() { // AdaptEnvForSnapshotBackup blocks the current goroutine and pause the GC safepoint and remove the scheduler by the config. // This function will block until the context being canceled. func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { + utils.DumpGoroutineWhenExit.Store(true) mgr, err := dialPD(ctx, &cfg.Config) if err != nil { return errors.Annotate(err, "failed to dial PD") @@ -141,6 +142,7 @@ func AdaptEnvForSnapshotBackup(ctx context.Context, cfg *PauseGcConfig) error { if cfg.OnAllReady != nil { cfg.OnAllReady() } + utils.DumpGoroutineWhenExit.Store(false) hintAllReady() }() defer func() { diff --git a/br/pkg/utils/misc.go b/br/pkg/utils/misc.go index 84774af0906a1..c351f62011a76 100644 --- a/br/pkg/utils/misc.go +++ b/br/pkg/utils/misc.go @@ -16,8 +16,16 @@ package utils import ( "context" "crypto/tls" + "fmt" + "os" + "os/signal" + "runtime" + "strings" + "sync/atomic" + "syscall" "time" + "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" @@ -157,3 +165,52 @@ func WithCleanUp(errOut *error, timeout time.Duration, fn func(context.Context) log.Warn("Encountered but ignored error while cleaning up.", zap.Error(err)) } } + +func AllStackInfo() []byte { + res := make([]byte, 256*units.KiB) + for { + n := runtime.Stack(res, true) + if n < len(res) { + return res[:n] + } + res = make([]byte, len(res)*2) + } +} + +var ( + DumpGoroutineWhenExit atomic.Bool +) + +func StartExitSingleListener(ctx context.Context) (context.Context, context.CancelFunc) { + cx, cancel := context.WithCancel(ctx) + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + go func() { + sig := <-sc + dumpGoroutine := DumpGoroutineWhenExit.Load() + padding := strings.Repeat("=", 8) + printDelimate := func(s string) { + fmt.Printf("%s[ %s ]%s\n", padding, s, padding) + } + fmt.Println() + printDelimate(fmt.Sprintf("Got signal %v to exit.", sig)) + printDelimate(fmt.Sprintf("Required Goroutine Dump = %v", dumpGoroutine)) + if dumpGoroutine { + printDelimate("Start Dumping Goroutine") + _, _ = os.Stdout.Write(AllStackInfo()) + printDelimate("End of Dumping Goroutine") + } + log.Warn("received signal to exit", zap.Stringer("signal", sig)) + cancel() + fmt.Fprintln(os.Stderr, "gracefully shutting down, press ^C again to force exit") + <-sc + // Even user use SIGTERM to exit, there isn't any checkpoint for resuming, + // hence returning fail exit code. + os.Exit(1) + }() + return cx, cancel +}