Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

topsql: introduce datasink interface #30662

Merged
merged 9 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1568,7 +1568,7 @@ func TestTopSQLAgent(t *testing.T) {
dbt.MustExec("set @@global.tidb_top_sql_report_interval_seconds=2;")
dbt.MustExec("set @@global.tidb_top_sql_max_statement_count=5;")

r := reporter.NewRemoteTopSQLReporter(reporter.NewGRPCReportClient(plancodec.DecodeNormalizedPlan))
r := reporter.NewRemoteTopSQLReporter(reporter.NewSingleTargetDataSink(plancodec.DecodeNormalizedPlan))
tracecpu.GlobalSQLCPUProfiler.SetCollector(&collectorWrapper{r})

// TODO: change to ensure that the right sql statements are reported, not just counts
Expand Down
36 changes: 36 additions & 0 deletions util/topsql/reporter/datasink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reporter

import "time"

// DataSink collects and sends data to a target.
type DataSink interface {
// Send pushes a report data into the sink, which will later be sent to a target by the sink. A deadline can be
// specified to control how late it should be sent. If the sink is kept full and cannot schedule a send within
// the specified deadline, the data will be silently dropped.
Send(data ReportData, deadline time.Time)

// IsPaused indicates that the DataSink is not expecting to receive records for now
// and may resume in the future.
IsPaused() bool

// IsDown indicates that the DataSink has been down and can be cleared.
// Note that: once a DataSink is down, it cannot go back to be up.
IsDown() bool

// Close cleans up resources owned by this DataSink
Close()
}
40 changes: 15 additions & 25 deletions util/topsql/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -118,9 +117,9 @@ type planBinaryDecodeFunc func(string) (string, error)
// RemoteTopSQLReporter implements a TopSQL reporter that sends data to a remote agent
// This should be called periodically to collect TopSQL resource usage metrics
type RemoteTopSQLReporter struct {
ctx context.Context
cancel context.CancelFunc
client ReportClient
ctx context.Context
cancel context.CancelFunc
dataSink DataSink

// normalizedSQLMap is an map, whose keys are SQL digest strings and values are SQLMeta.
normalizedSQLMap atomic.Value // sync.Map
Expand All @@ -145,12 +144,12 @@ type SQLMeta struct {
//
// planBinaryDecoder is a decoding function which will be called asynchronously to decode the plan binary to string
// MaxStatementsNum is the maximum SQL and plan number, which will restrict the memory usage of the internal LFU cache
func NewRemoteTopSQLReporter(client ReportClient) *RemoteTopSQLReporter {
func NewRemoteTopSQLReporter(dataSink DataSink) *RemoteTopSQLReporter {
ctx, cancel := context.WithCancel(context.Background())
tsr := &RemoteTopSQLReporter{
ctx: ctx,
cancel: cancel,
client: client,
dataSink: dataSink,
collectCPUDataChan: make(chan cpuData, 1),
reportCollectedDataChan: make(chan collectedData, 1),
}
Expand Down Expand Up @@ -238,7 +237,7 @@ func (tsr *RemoteTopSQLReporter) Collect(timestamp uint64, records []tracecpu.SQ
// Close uses to close and release the reporter resource.
func (tsr *RemoteTopSQLReporter) Close() {
tsr.cancel()
tsr.client.Close()
tsr.dataSink.Close()
}

func addEvictedCPUTime(collectTarget map[string]*dataPoints, timestamp uint64, totalCPUTimeMs uint32) {
Expand Down Expand Up @@ -464,15 +463,15 @@ type collectedData struct {
normalizedPlanMap *sync.Map
}

// reportData contains data that reporter sends to the agent
type reportData struct {
// ReportData contains data that reporter sends to the agent
type ReportData struct {
// collectedData contains the topN collected records and the `others` record which aggregation all records that is out of Top N.
collectedData []*dataPoints
normalizedSQLMap *sync.Map
normalizedPlanMap *sync.Map
}

func (d *reportData) hasData() bool {
func (d *ReportData) hasData() bool {
if len(d.collectedData) > 0 {
return true
}
Expand Down Expand Up @@ -510,9 +509,9 @@ func (tsr *RemoteTopSQLReporter) reportWorker() {
}
}

// getReportData gets reportData from the collectedData.
// getReportData gets ReportData from the collectedData.
// This function will calculate the topN collected records and the `others` record which aggregation all records that is out of Top N.
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) reportData {
func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) ReportData {
// Fetch TopN dataPoints.
others := collected.records[keyOthers]
delete(collected.records, keyOthers)
Expand All @@ -539,21 +538,20 @@ func (tsr *RemoteTopSQLReporter) getReportData(collected collectedData) reportDa
records = append(records, others)
}

return reportData{
return ReportData{
collectedData: records,
normalizedSQLMap: collected.normalizedSQLMap,
normalizedPlanMap: collected.normalizedPlanMap,
}
}

func (tsr *RemoteTopSQLReporter) doReport(data reportData) {
func (tsr *RemoteTopSQLReporter) doReport(data ReportData) {
defer util.Recover("top-sql", "doReport", nil, false)

if !data.hasData() {
return
}

agentAddr := config.GetGlobalConfig().TopSQL.ReceiverAddress
timeout := reportTimeout
failpoint.Inject("resetTimeoutForTest", func(val failpoint.Value) {
if val.(bool) {
Expand All @@ -563,14 +561,6 @@ func (tsr *RemoteTopSQLReporter) doReport(data reportData) {
}
}
})
ctx, cancel := context.WithTimeout(tsr.ctx, timeout)
start := time.Now()
err := tsr.client.Send(ctx, agentAddr, data)
if err != nil {
logutil.BgLogger().Warn("[top-sql] client failed to send data", zap.Error(err))
reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds())
} else {
reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds())
}
cancel()
deadline := time.Now().Add(timeout)
tsr.dataSink.Send(data, deadline)
}
2 changes: 1 addition & 1 deletion util/topsql/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func setupRemoteTopSQLReporter(maxStatementsNum, interval int, addr string) *Rem
conf.TopSQL.ReceiverAddress = addr
})

rc := NewGRPCReportClient(mockPlanBinaryDecoderFunc)
rc := NewSingleTargetDataSink(mockPlanBinaryDecoderFunc)
ts := NewRemoteTopSQLReporter(rc)
return ts
}
Expand Down
Loading