diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index b3ae6b5e631..3ad64d48358 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -147,6 +147,8 @@ Usage of vtgate: Send unnormalized SQL for individually reported queries --insights_raw_queries_max_length uint Maximum size for unnormalized SQL (default 8192) + --insights_reorder_threshold uint + Reorder INSERT columns if more than this many redundant patterns in an interval (default 5) --insights_rows_read_threshold uint Report individual transactions that read (scan) at least this many rows (default 10000) --insights_rt_threshold uint diff --git a/go/vt/vtgate/insights.go b/go/vt/vtgate/insights.go index d730af20796..b2ead9772f2 100644 --- a/go/vt/vtgate/insights.go +++ b/go/vt/vtgate/insights.go @@ -64,10 +64,6 @@ const ( BrokersVar = "INSIGHTS_KAFKA_BROKERS" UsernameVar = "INSIGHTS_KAFKA_USERNAME" PasswordVar = "INSIGHTS_KAFKA_PASSWORD" - - // normalize more aggressively by alphabetizing INSERT columns if there are more than this many - // otherwise identical patterns in a single interval - ReorderColumnThreshold = 5 ) type QueryPatternAggregation struct { @@ -116,6 +112,7 @@ type Insights struct { KafkaText bool // use human-readable pb, for tests and debugging SendRawQueries bool MaxRawQueryLength uint + ReorderThreshold uint // alphabetize columns if >= this many redundant patterns in 15s // state KafkaWriter *kafka.Writer @@ -127,7 +124,7 @@ type Insights struct { Workers sync.WaitGroup QueriesThisInterval uint ReorderInsertColumns bool - ColIndependentHashes map[uint32]int + ColIndependentHashes map[uint32]uint // log state: we limit some log messages to once per 15s because they're caused by behavior the // client controls @@ -181,6 +178,10 @@ var ( // insightsRawQueriesMaxLength is the longest string, in bytes, we will send as the RawSql field in a Kafka message insightsRawQueriesMaxLength = flag.Uint("insights_raw_queries_max_length", 8192, "Maximum size for unnormalized SQL") + + // normalize more aggressively by alphabetizing INSERT columns if there are more than this many + // otherwise identical patterns in a single interval + insightsReorderThreshold = flag.Uint("insights_reorder_threshold", 5, "Reorder INSERT columns if more than this many redundant patterns in an interval") ) func initInsights(logger *streamlog.StreamLogger) (*Insights, error) { @@ -195,6 +196,7 @@ func initInsights(logger *streamlog.StreamLogger) (*Insights, error) { *insightsRTThreshold, *insightsMaxQueriesPerInterval, *insightsRawQueriesMaxLength, + *insightsReorderThreshold, time.Duration(*insightsFlushInterval)*time.Second, *insightsKafkaText, *insightsRawQueries, @@ -203,7 +205,7 @@ func initInsights(logger *streamlog.StreamLogger) (*Insights, error) { func initInsightsInner(logger *streamlog.StreamLogger, brokers, publicID, username, password string, - bufsize, patternLimit, rowsReadThreshold, responseTimeThreshold, maxQueriesPerInterval, maxRawQueryLength uint, + bufsize, patternLimit, rowsReadThreshold, responseTimeThreshold, maxQueriesPerInterval, maxRawQueryLength, reorderThreshold uint, interval time.Duration, kafkaText, sendRawQueries bool) (*Insights, error) { @@ -229,6 +231,7 @@ func initInsightsInner(logger *streamlog.StreamLogger, KafkaText: kafkaText, SendRawQueries: sendRawQueries, MaxRawQueryLength: maxRawQueryLength, + ReorderThreshold: reorderThreshold, } insights.Sender = insights.sendToKafka err := insights.logToKafka(logger) @@ -267,7 +270,7 @@ func (ii *Insights) startInterval() { ii.QueriesThisInterval = 0 ii.Aggregations = make(map[QueryPatternKey]*QueryPatternAggregation) ii.PeriodStart = time.Now() - ii.ColIndependentHashes = make(map[uint32]int) + ii.ColIndependentHashes = make(map[uint32]uint) } func (ii *Insights) shouldSendToInsights(ls *logstats.LogStats) bool { @@ -595,7 +598,7 @@ func (ii *Insights) addToAggregates(ls *logstats.LogStats, sql string, ciHash *u if ciHash != nil && !ii.ReorderInsertColumns { ii.ColIndependentHashes[*ciHash]++ - if ii.ColIndependentHashes[*ciHash] >= ReorderColumnThreshold { + if ii.ColIndependentHashes[*ciHash] >= ii.ReorderThreshold { log.Info("Enabling ReorderInsertColumns") ii.ReorderInsertColumns = true } diff --git a/go/vt/vtgate/insights_test.go b/go/vt/vtgate/insights_test.go index cd8604e4ec7..f8f24c32bfa 100644 --- a/go/vt/vtgate/insights_test.go +++ b/go/vt/vtgate/insights_test.go @@ -62,7 +62,7 @@ func setup(t *testing.T, brokers, publicID, username, password string, options s dfl(options.responseTimeThreshold, 1000), dfl(options.maxPerInterval, 100), dfl(options.maxRawQuerySize, 64), - 15*time.Second, true, true) + 5, 15*time.Second, true, true) if insights != nil { t.Cleanup(func() { insights.Drain() }) }