Skip to content

Commit

Permalink
slack-vitess-r14.0.5-dsdefense: backport directive rename from vite…
Browse files Browse the repository at this point in the history
  • Loading branch information
timvaillancourt authored Apr 19, 2023
1 parent 9a9e058 commit cfdbd1e
Show file tree
Hide file tree
Showing 17 changed files with 795 additions and 789 deletions.
4 changes: 2 additions & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,8 @@ age_bad_rate_after_sec: 180
bad_rate_increase: 0.1
max_rate_approach_threshold: 0.9
)
--tx-throttler-default-criticality int
Default criticality assigned to queries that lack criticality information.
--tx-throttler-default-priority int
Default priority assigned to queries that lack priority information.
--tx-throttler-healthcheck-cells value
Synonym to -tx_throttler_healthcheck_cells
--tx_throttler_config string
Expand Down
1,347 changes: 673 additions & 674 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ const (
DirectiveQueryPlanner = "PLANNER"
// DirectiveWorkloadName specifies the name of the client application workload issuing the query.
DirectiveWorkloadName = "WORKLOAD_NAME"
// DirectiveCriticality specifies the criticality of a workload. It should be an integer between 0 and 100, where
// 100 is the highest criticality, and 0 is the lowest one.
DirectiveCriticality = "CRITICALITY"
// DirectivePriority specifies the priority of a workload. It should be an integer between 0 and 100, where
// 100 is the highest priority, and 0 is the lowest one.
DirectivePriority = "PRIORITY"

// MaxCriticalityValue specifies the maximum value allowed for criticality. Valid criticality values are
// between zero and MaxCriticalityValue.
MaxCriticalityValue = 100
// MaxPriorityValue specifies the maximum value allowed for priority. Valid priority values are
// between zero and MaxPriorityValue.
MaxPriorityValue = 100
)

var ErrInvalidCriticality = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Invalid criticality value specified in query")
var ErrInvalidPriority = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Invalid priority value specified in query")

func isNonSpace(r rune) bool {
return !unicode.IsSpace(r)
Expand Down Expand Up @@ -384,27 +384,27 @@ func CommentsForStatement(stmt Statement) Comments {
return nil
}

// GetCriticalityFromStatement gets the criticality from the provided Statement, using DirectiveCriticality
func GetCriticalityFromStatement(statement Statement) (string, error) {
// GetPriorityFromStatement gets the priority from the provided Statement, using DirectivePriority
func GetPriorityFromStatement(statement Statement) (string, error) {
commentedStatement, ok := statement.(Commented)
// This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to
// empty criticality
// empty priority
if !ok {
return "", nil
}

directives := commentedStatement.GetParsedComments().Directives()
criticality := directives.GetString(DirectiveCriticality, "")
if !ok || criticality == "" {
priority := directives.GetString(DirectivePriority, "")
if priority == "" {
return "", nil
}

intCriticality, err := strconv.Atoi(criticality)
if err != nil || intCriticality < 0 || intCriticality > MaxCriticalityValue {
return "", ErrInvalidCriticality
intPriority, err := strconv.Atoi(priority)
if err != nil || intPriority < 0 || intPriority > MaxPriorityValue {
return "", ErrInvalidPriority
}

return criticality, nil
return priority, nil
}

// GetWorkloadNameFromStatement gets the workload name from the provided Statement, using workloadLabel as the name of
Expand Down
60 changes: 30 additions & 30 deletions go/vt/sqlparser/comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,51 +480,51 @@ func TestIgnoreMaxMaxMemoryRowsDirective(t *testing.T) {
}
}

func TestGetCriticalityFromStatement(t *testing.T) {
func TestGetPriorityFromStatement(t *testing.T) {
testCases := []struct {
query string
expectedCriticality string
expectedError error
query string
expectedPriority string
expectedError error
}{
{
query: "select * from a_table",
expectedCriticality: "",
expectedError: nil,
query: "select * from a_table",
expectedPriority: "",
expectedError: nil,
},
{
query: "select /*vt+ ANOTHER_DIRECTIVE=324 */ * from another_table",
expectedCriticality: "",
expectedError: nil,
query: "select /*vt+ ANOTHER_DIRECTIVE=324 */ * from another_table",
expectedPriority: "",
expectedError: nil,
},
{
query: "select /*vt+ CRITICALITY=33 */ * from another_table",
expectedCriticality: "33",
expectedError: nil,
query: "select /*vt+ PRIORITY=33 */ * from another_table",
expectedPriority: "33",
expectedError: nil,
},
{
query: "select /*vt+ CRITICALITY=200 */ * from another_table",
expectedCriticality: "",
expectedError: ErrInvalidCriticality,
query: "select /*vt+ PRIORITY=200 */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ CRITICALITY=-1 */ * from another_table",
expectedCriticality: "",
expectedError: ErrInvalidCriticality,
query: "select /*vt+ PRIORITY=-1 */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ CRITICALITY=some_text */ * from another_table",
expectedCriticality: "",
expectedError: ErrInvalidCriticality,
query: "select /*vt+ PRIORITY=some_text */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ CRITICALITY=0 */ * from another_table",
expectedCriticality: "0",
expectedError: nil,
query: "select /*vt+ PRIORITY=0 */ * from another_table",
expectedPriority: "0",
expectedError: nil,
},
{
query: "select /*vt+ CRITICALITY=100 */ * from another_table",
expectedCriticality: "100",
expectedError: nil,
query: "select /*vt+ PRIORITY=100 */ * from another_table",
expectedPriority: "100",
expectedError: nil,
},
}

Expand All @@ -534,12 +534,12 @@ func TestGetCriticalityFromStatement(t *testing.T) {
t.Parallel()
stmt, err := Parse(theThestCase.query)
assert.NoError(t, err)
actualCriticality, actualError := GetCriticalityFromStatement(stmt)
actualPriority, actualError := GetPriorityFromStatement(stmt)
if theThestCase.expectedError != nil {
assert.ErrorIs(t, actualError, theThestCase.expectedError)
} else {
assert.NoError(t, err)
assert.Equal(t, theThestCase.expectedCriticality, actualCriticality)
assert.Equal(t, theThestCase.expectedPriority, actualPriority)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (t *noopVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) {
panic("implement me")
}

func (t *noopVCursor) SetCriticality(string) {
func (t *noopVCursor) SetPriority(string) {
panic("implement me")
}

Expand Down Expand Up @@ -710,7 +710,7 @@ func (f *loggingVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion
panic("implement me")
}

func (f *loggingVCursor) SetCriticality(string) {
func (f *loggingVCursor) SetPriority(string) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type (
SetWorkload(querypb.ExecuteOptions_Workload)
SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion)
SetWorkloadName(string)
SetCriticality(string)
SetPriority(string)
SetFoundRows(uint64)

SetDDLStrategy(string)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,11 +973,11 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
}
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
criticality, err := sqlparser.GetCriticalityFromStatement(stmt)
priority, err := sqlparser.GetPriorityFromStatement(stmt)
if err != nil {
return nil, err
}
vcursor.SetCriticality(criticality)
vcursor.SetPriority(priority)
vcursor.SetWorkloadName(sqlparser.GetWorkloadNameFromStatement(stmt))

setVarComment, err := prepareSetVarComment(vcursor, stmt)
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,16 +1810,16 @@ func TestGetPlanNormalized(t *testing.T) {
assertCacheContains(t, r, want)
}

func TestGetPlanCriticalityCriticality(t *testing.T) {
func TestGetPlanPriority(t *testing.T) {
testCases := []struct {
name string
sql string
expectedCriticality string
expectedError error
name string
sql string
expectedPriority string
expectedError error
}{
{name: "empty criticality", sql: "select * from music_user_map", expectedCriticality: "", expectedError: nil},
{name: "Invalid criticality", sql: "select /*vt+ CRITICALITY=something */ * from music_user_map", expectedCriticality: "", expectedError: sqlparser.ErrInvalidCriticality},
{name: "Valid criticality", sql: "select /*vt+ CRITICALITY=33 */ * from music_user_map", expectedCriticality: "33", expectedError: nil},
{name: "empty priority", sql: "select * from music_user_map", expectedPriority: "", expectedError: nil},
{name: "Invalid priority", sql: "select /*vt+ PRIORITY=something */ * from music_user_map", expectedPriority: "", expectedError: sqlparser.ErrInvalidPriority},
{name: "Valid priority", sql: "select /*vt+ PRIORITY=33 */ * from music_user_map", expectedPriority: "33", expectedError: nil},
}

for _, aTestCase := range testCases {
Expand All @@ -1832,17 +1832,17 @@ func TestGetPlanCriticalityCriticality(t *testing.T) {
vCursor, err := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown", Options: &querypb.ExecuteOptions{}}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
assert.NoError(t, err)

stmt1, err := sqlparser.Parse(testCase.sql)
stmt, err := sqlparser.Parse(testCase.sql)
assert.NoError(t, err)
crticalityFromStatement, _ := sqlparser.GetCriticalityFromStatement(stmt1)
crticalityFromStatement, _ := sqlparser.GetPriorityFromStatement(stmt)

_, err = r.getPlan(vCursor, testCase.sql, makeComments("/* some comment */"), map[string]*querypb.BindVariable{}, &SafeSession{Session: &vtgatepb.Session{Options: &querypb.ExecuteOptions{}}}, logStats)
if testCase.expectedError != nil {
assert.ErrorIs(t, err, testCase.expectedError)
} else {
assert.NoError(t, err)
assert.Equal(t, testCase.expectedCriticality, crticalityFromStatement)
assert.Equal(t, testCase.expectedCriticality, vCursor.safeSession.Options.Criticality)
assert.Equal(t, testCase.expectedPriority, crticalityFromStatement)
assert.Equal(t, testCase.expectedPriority, vCursor.safeSession.Options.Priority)
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,9 @@ func (vc *vcursorImpl) SetPlannerVersion(v plancontext.PlannerVersion) {
vc.safeSession.GetOrCreateOptions().PlannerVersion = v
}

func (vc *vcursorImpl) SetCriticality(criticality string) {
if criticality != "" {
vc.safeSession.GetOrCreateOptions().Criticality = criticality
func (vc *vcursorImpl) SetPriority(priority string) {
if priority != "" {
vc.safeSession.GetOrCreateOptions().Priority = priority
}
}

Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func init() {
flagutil.DualFormatBoolVar(&currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(&currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text formatted throttlerdata.Configuration protocol buffer message")
flagutil.DualFormatStringListVar(&currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
flag.IntVar(&currentConfig.TxThrottlerDefaultCriticality, "tx-throttler-default-criticality", defaultConfig.TxThrottlerDefaultCriticality, "Default criticality assigned to queries that lack criticality information.")
flag.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information.")

flag.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
flag.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
Expand Down Expand Up @@ -289,10 +289,10 @@ type TabletConfig struct {
TwoPCCoordinatorAddress string `json:"-"`
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultCriticality int `json:"-"`
EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`

EnableLagThrottler bool `json:"-"`

Expand Down Expand Up @@ -502,10 +502,10 @@ var defaultConfig = TabletConfig{
CacheResultFields: true,
SignalWhenSchemaChange: true,

EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultCriticality: 0, // This leads to all queries being candidates to throttle
EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: 0, // This leads to all queries being candidates to throttle

EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future

Expand Down
19 changes: 13 additions & 6 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,21 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, preQ
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
criticality := tsv.config.TxThrottlerDefaultCriticality
if options != nil && options.Criticality != "" {
optionsCriticality, err := strconv.Atoi(options.Criticality)
if err == nil {
criticality = optionsCriticality
priority := tsv.config.TxThrottlerDefaultPriority
if options != nil && options.Priority != "" {
optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)
} else {
priority = optionsPriority
}
}
if tsv.txThrottler.Throttle(criticality) {
if tsv.txThrottler.Throttle(priority) {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
}
var beginSQL string
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,17 @@ func (t *TxThrottler) Close() {
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *TxThrottler) Throttle(criticality int) (result bool) {
func (t *TxThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}

// Throttle according to both what the throttle state says, and the criticality. Workloads with higher criticality
// Throttle according to both what the throttle state says, and the priority. Workloads with higher priority
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxCriticalityValue) > criticality
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) > priority
t.requestsTotal.Add(1)
if result {
t.requestsThrottled.Add(1)
Expand Down
Loading

0 comments on commit cfdbd1e

Please sign in to comment.