Skip to content

Commit

Permalink
Add -insights_reorder_threshold flag (vitessio#1277)
Browse files Browse the repository at this point in the history
* Add -insights_reorder_threshold flag

We reorder the column list in INSERT statements if there are more than N
patterns in a single interval that differ only in the order of their
column lists.  N used to be a constant 5, and now it's configurable on the
command line.

Signed-off-by: Patrick Reynolds <[email protected]>

* Add the new flag to the e2e test

Signed-off-by: Patrick Reynolds <[email protected]>

Signed-off-by: Patrick Reynolds <[email protected]>
  • Loading branch information
piki authored Nov 7, 2022
1 parent 87fedb2 commit 2c375ac
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ Usage of vtgate:
Send unnormalized SQL for individually reported queries
--insights_raw_queries_max_length uint
Maximum size for unnormalized SQL (default 8192)
--insights_reorder_threshold uint
Reorder INSERT columns if more than this many redundant patterns in an interval (default 5)
--insights_rows_read_threshold uint
Report individual transactions that read (scan) at least this many rows (default 10000)
--insights_rt_threshold uint
Expand Down
19 changes: 11 additions & 8 deletions go/vt/vtgate/insights.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ const (
BrokersVar = "INSIGHTS_KAFKA_BROKERS"
UsernameVar = "INSIGHTS_KAFKA_USERNAME"
PasswordVar = "INSIGHTS_KAFKA_PASSWORD"

// normalize more aggressively by alphabetizing INSERT columns if there are more than this many
// otherwise identical patterns in a single interval
ReorderColumnThreshold = 5
)

type QueryPatternAggregation struct {
Expand Down Expand Up @@ -116,6 +112,7 @@ type Insights struct {
KafkaText bool // use human-readable pb, for tests and debugging
SendRawQueries bool
MaxRawQueryLength uint
ReorderThreshold uint // alphabetize columns if >= this many redundant patterns in 15s

// state
KafkaWriter *kafka.Writer
Expand All @@ -127,7 +124,7 @@ type Insights struct {
Workers sync.WaitGroup
QueriesThisInterval uint
ReorderInsertColumns bool
ColIndependentHashes map[uint32]int
ColIndependentHashes map[uint32]uint

// log state: we limit some log messages to once per 15s because they're caused by behavior the
// client controls
Expand Down Expand Up @@ -181,6 +178,10 @@ var (

// insightsRawQueriesMaxLength is the longest string, in bytes, we will send as the RawSql field in a Kafka message
insightsRawQueriesMaxLength = flag.Uint("insights_raw_queries_max_length", 8192, "Maximum size for unnormalized SQL")

// normalize more aggressively by alphabetizing INSERT columns if there are more than this many
// otherwise identical patterns in a single interval
insightsReorderThreshold = flag.Uint("insights_reorder_threshold", 5, "Reorder INSERT columns if more than this many redundant patterns in an interval")
)

func initInsights(logger *streamlog.StreamLogger) (*Insights, error) {
Expand All @@ -195,6 +196,7 @@ func initInsights(logger *streamlog.StreamLogger) (*Insights, error) {
*insightsRTThreshold,
*insightsMaxQueriesPerInterval,
*insightsRawQueriesMaxLength,
*insightsReorderThreshold,
time.Duration(*insightsFlushInterval)*time.Second,
*insightsKafkaText,
*insightsRawQueries,
Expand All @@ -203,7 +205,7 @@ func initInsights(logger *streamlog.StreamLogger) (*Insights, error) {

func initInsightsInner(logger *streamlog.StreamLogger,
brokers, publicID, username, password string,
bufsize, patternLimit, rowsReadThreshold, responseTimeThreshold, maxQueriesPerInterval, maxRawQueryLength uint,
bufsize, patternLimit, rowsReadThreshold, responseTimeThreshold, maxQueriesPerInterval, maxRawQueryLength, reorderThreshold uint,
interval time.Duration,
kafkaText, sendRawQueries bool) (*Insights, error) {

Expand All @@ -229,6 +231,7 @@ func initInsightsInner(logger *streamlog.StreamLogger,
KafkaText: kafkaText,
SendRawQueries: sendRawQueries,
MaxRawQueryLength: maxRawQueryLength,
ReorderThreshold: reorderThreshold,
}
insights.Sender = insights.sendToKafka
err := insights.logToKafka(logger)
Expand Down Expand Up @@ -267,7 +270,7 @@ func (ii *Insights) startInterval() {
ii.QueriesThisInterval = 0
ii.Aggregations = make(map[QueryPatternKey]*QueryPatternAggregation)
ii.PeriodStart = time.Now()
ii.ColIndependentHashes = make(map[uint32]int)
ii.ColIndependentHashes = make(map[uint32]uint)
}

func (ii *Insights) shouldSendToInsights(ls *logstats.LogStats) bool {
Expand Down Expand Up @@ -595,7 +598,7 @@ func (ii *Insights) addToAggregates(ls *logstats.LogStats, sql string, ciHash *u

if ciHash != nil && !ii.ReorderInsertColumns {
ii.ColIndependentHashes[*ciHash]++
if ii.ColIndependentHashes[*ciHash] >= ReorderColumnThreshold {
if ii.ColIndependentHashes[*ciHash] >= ii.ReorderThreshold {
log.Info("Enabling ReorderInsertColumns")
ii.ReorderInsertColumns = true
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/insights_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func setup(t *testing.T, brokers, publicID, username, password string, options s
dfl(options.responseTimeThreshold, 1000),
dfl(options.maxPerInterval, 100),
dfl(options.maxRawQuerySize, 64),
15*time.Second, true, true)
5, 15*time.Second, true, true)
if insights != nil {
t.Cleanup(func() { insights.Drain() })
}
Expand Down

0 comments on commit 2c375ac

Please sign in to comment.