From 0e42a50f18c7c0712aae8801907133520c6d659f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 17 Apr 2023 14:27:18 +0800 Subject: [PATCH] *: fix the missing log panic (#6325) close tikv/pd#6257 Signed-off-by: Ryan Leung --- pkg/election/lease.go | 2 ++ pkg/schedule/labeler/labeler.go | 3 +++ pkg/statistics/buckets/hot_bucket_cache.go | 2 ++ pkg/statistics/hot_cache.go | 3 +++ pkg/storage/endpoint/gc_key_space.go | 5 +++++ pkg/storage/hot_region_storage.go | 5 +++++ pkg/storage/leveldb_backend.go | 3 +++ pkg/tso/allocator_manager.go | 2 ++ pkg/tso/global_allocator.go | 1 + pkg/tso/keyspace_group_manager.go | 3 +++ pkg/tso/local_allocator.go | 2 ++ pkg/utils/metricutil/metricutil.go | 3 +++ pkg/utils/tsoutil/tso_dispatcher.go | 5 +++++ server/cluster/coordinator.go | 1 + server/grpc_service.go | 9 ++++++++- 15 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pkg/election/lease.go b/pkg/election/lease.go index 99936a39ad1..f7542bff042 100644 --- a/pkg/election/lease.go +++ b/pkg/election/lease.go @@ -95,6 +95,8 @@ func (l *lease) IsExpired() bool { // KeepAlive auto renews the lease and update expireTime. func (l *lease) KeepAlive(ctx context.Context) { + defer logutil.LogPanic() + if l == nil { return } diff --git a/pkg/schedule/labeler/labeler.go b/pkg/schedule/labeler/labeler.go index 01b701e59f8..e80a75fc904 100644 --- a/pkg/schedule/labeler/labeler.go +++ b/pkg/schedule/labeler/labeler.go @@ -25,6 +25,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/schedule/rangelist" "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -56,6 +57,8 @@ func NewRegionLabeler(ctx context.Context, storage endpoint.RuleStorage, gcInter } func (l *RegionLabeler) doGC(gcInterval time.Duration) { + defer logutil.LogPanic() + ticker := time.NewTicker(gcInterval) defer ticker.Stop() for { diff --git a/pkg/statistics/buckets/hot_bucket_cache.go b/pkg/statistics/buckets/hot_bucket_cache.go index 4de3ec99f7c..15148e2dd97 100644 --- a/pkg/statistics/buckets/hot_bucket_cache.go +++ b/pkg/statistics/buckets/hot_bucket_cache.go @@ -150,6 +150,8 @@ func (h *HotBucketCache) CheckAsync(task flowBucketsItemTask) bool { } func (h *HotBucketCache) schedule() { + defer logutil.LogPanic() + for { select { case <-h.ctx.Done(): diff --git a/pkg/statistics/hot_cache.go b/pkg/statistics/hot_cache.go index f95683fbb57..42c1e6c49a7 100644 --- a/pkg/statistics/hot_cache.go +++ b/pkg/statistics/hot_cache.go @@ -19,6 +19,7 @@ import ( "github.com/smallnest/chanx" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/utils/logutil" ) const chanMaxLength = 6000000 @@ -129,6 +130,8 @@ func (w *HotCache) ResetMetrics() { } func (w *HotCache) updateItems(queue *chanx.UnboundedChan[FlowItemTask], runTask func(task FlowItemTask)) { + defer logutil.LogPanic() + for { select { case <-w.ctx.Done(): diff --git a/pkg/storage/endpoint/gc_key_space.go b/pkg/storage/endpoint/gc_key_space.go index 66bf505583a..039c314cc30 100644 --- a/pkg/storage/endpoint/gc_key_space.go +++ b/pkg/storage/endpoint/gc_key_space.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/logutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -77,6 +78,8 @@ func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*Ser } if ssp.ExpiredAt < time.Now().Unix() { go func() { + defer logutil.LogPanic() + if err = se.Remove(key); err != nil { log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) } @@ -124,6 +127,8 @@ func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time }) // remove expired keys asynchronously go func() { + defer logutil.LogPanic() + for _, key := range expiredKeys { if err = se.Remove(key); err != nil { log.Error("remove expired key meet error", zap.String("key", key), errs.ZapError(err)) diff --git a/pkg/storage/hot_region_storage.go b/pkg/storage/hot_region_storage.go index 8ad09bbe853..62f8f5bb81f 100644 --- a/pkg/storage/hot_region_storage.go +++ b/pkg/storage/hot_region_storage.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/encryption" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" "go.uber.org/zap" ) @@ -162,6 +163,8 @@ func NewHotRegionsStorage( // Delete hot region whose update_time is smaller than time.Now() minus remain day in the background. func (h *HotRegionStorage) backgroundDelete() { + defer logutil.LogPanic() + // make delete happened in defaultDeleteTime clock. now := time.Now() next := time.Date(now.Year(), now.Month(), now.Day(), defaultDeleteTime, 0, 0, 0, now.Location()) @@ -198,6 +201,8 @@ func (h *HotRegionStorage) backgroundDelete() { // Write hot_region info into db in the background. func (h *HotRegionStorage) backgroundFlush() { + defer logutil.LogPanic() + interval := h.getCurInterval() ticker := time.NewTicker(interval) defer func() { diff --git a/pkg/storage/leveldb_backend.go b/pkg/storage/leveldb_backend.go index 25044008f6e..d25044e9c20 100644 --- a/pkg/storage/leveldb_backend.go +++ b/pkg/storage/leveldb_backend.go @@ -27,6 +27,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/syncutil" ) @@ -80,6 +81,8 @@ func newLevelDBBackend( var dirtyFlushTick = time.Second func (lb *levelDBBackend) backgroundFlush() { + defer logutil.LogPanic() + var ( isFlush bool err error diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 2c674039296..2c98fcb5988 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -940,6 +940,8 @@ func (am *AllocatorManager) GetLocalTSOSuffixPath(dcLocation string) string { // 2. If all PD servers with dc-location="dc-1" are down, then the other PD servers // of DC could be elected. func (am *AllocatorManager) PriorityChecker() { + defer logutil.LogPanic() + serverID := am.member.ID() myServerDCLocation := am.getServerDCLocation(serverID) // Check all Local TSO Allocator followers to see if their priorities is higher than the leaders diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 67e398bbcee..3badc064190 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -475,6 +475,7 @@ func (gta *GlobalTSOAllocator) Reset() { } func (gta *GlobalTSOAllocator) primaryElectionLoop() { + defer logutil.LogPanic() defer gta.wg.Done() for { diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 506b36dff58..dc6773efdfe 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -82,6 +82,7 @@ func (s *state) deinitialize() { if am != nil { wg.Add(1) go func(am *AllocatorManager) { + defer logutil.LogPanic() defer wg.Done() am.close() log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", am.kgID)) @@ -274,6 +275,8 @@ func (kgm *KeyspaceGroupManager) Close() { } func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel context.CancelFunc, done chan struct{}) { + defer logutil.LogPanic() + select { case <-done: return diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 9b0d1dc1869..61908fe39a7 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" @@ -175,6 +176,7 @@ func (lta *LocalTSOAllocator) CampaignAllocatorLeader(leaseTimeout int64, cmps . // KeepAllocatorLeader is used to keep the PD leader's leadership. func (lta *LocalTSOAllocator) KeepAllocatorLeader(ctx context.Context) { + defer logutil.LogPanic() lta.leadership.Keep(ctx) } diff --git a/pkg/utils/metricutil/metricutil.go b/pkg/utils/metricutil/metricutil.go index 3e802fa8952..f0f0220c311 100644 --- a/pkg/utils/metricutil/metricutil.go +++ b/pkg/utils/metricutil/metricutil.go @@ -23,6 +23,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/push" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/typeutil" ) @@ -63,6 +64,8 @@ func camelCaseToSnakeCase(str string) string { // prometheusPushClient pushes metrics to Prometheus Pushgateway. func prometheusPushClient(job, addr string, interval time.Duration) { + defer logutil.LogPanic() + pusher := push.New(addr, job). Gatherer(prometheus.DefaultGatherer). Grouping("instance", instanceName()) diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 351fe424b16..8211471b3d6 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" "google.golang.org/grpc" ) @@ -71,6 +72,7 @@ func (s *TSODispatcher) DispatchRequest( func (s *TSODispatcher) dispatch( ctx context.Context, tsoProtoFactory ProtoFactory, forwardedHost string, clientConn *grpc.ClientConn, tsoRequestCh <-chan Request, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) { + defer logutil.LogPanic() dispatcherCtx, ctxCancel := context.WithCancel(ctx) defer ctxCancel() defer s.dispatchChs.Delete(forwardedHost) @@ -187,6 +189,7 @@ type deadline struct { } func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { + defer logutil.LogPanic() ctx, cancel := context.WithCancel(ctx) defer cancel() for { @@ -209,6 +212,8 @@ func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { } func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { + defer logutil.LogPanic() + select { case <-done: return diff --git a/server/cluster/coordinator.go b/server/cluster/coordinator.go index 932e1d185cd..d4c45c8b254 100644 --- a/server/cluster/coordinator.go +++ b/server/cluster/coordinator.go @@ -223,6 +223,7 @@ func (c *coordinator) checkPriorityRegions() { // The regions of new version key range and old version key range would be placed into // the suspect regions map func (c *coordinator) checkSuspectRanges() { + defer logutil.LogPanic() defer c.wg.Done() log.Info("coordinator begins to check suspect key ranges") ticker := time.NewTicker(checkSuspectRangesInterval) diff --git a/server/grpc_service.go b/server/grpc_service.go index 64136d1d079..4f39b62b224 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -560,6 +560,7 @@ func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error { } done := make(chan error, 1) go func() { + defer logutil.LogPanic() done <- b.stream.SendAndClose(bucket) }() select { @@ -598,7 +599,10 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error { return io.EOF } done := make(chan error, 1) - go func() { done <- s.stream.Send(m) }() + go func() { + defer logutil.LogPanic() + done <- s.stream.Send(m) + }() select { case err := <-done: if err != nil { @@ -1711,6 +1715,7 @@ func (s *GrpcServer) createHeartbeatForwardStream(client *grpc.ClientConn) (pdpb } func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) { + defer logutil.LogPanic() defer close(errCh) for { resp, err := forwardStream.Recv() @@ -1735,6 +1740,7 @@ func (s *GrpcServer) createReportBucketsForwardStream(client *grpc.ClientConn) ( } func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient, server *bucketHeartbeatServer, errCh chan error) { + defer logutil.LogPanic() defer close(errCh) for { resp, err := forwardStream.CloseAndRecv() @@ -1751,6 +1757,7 @@ func forwardReportBucketClientToServer(forwardStream pdpb.PD_ReportBucketsClient // TODO: If goroutine here timeout when tso stream created successfully, we need to handle it correctly. func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { + defer logutil.LogPanic() select { case <-done: return