Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slack-vitess-r14.0.5-dsdefense: backport directive rename from vitessio/vitess#12662 #72

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -935,8 +935,8 @@ age_bad_rate_after_sec: 180
bad_rate_increase: 0.1
max_rate_approach_threshold: 0.9
)
--tx-throttler-default-criticality int
Default criticality assigned to queries that lack criticality information.
--tx-throttler-default-priority int
Default priority assigned to queries that lack priority information.
--tx-throttler-healthcheck-cells value
Synonym to -tx_throttler_healthcheck_cells
--tx_throttler_config string
Expand Down
1,347 changes: 673 additions & 674 deletions go/vt/proto/query/query.pb.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions go/vt/proto/query/query_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 16 additions & 16 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@ const (
DirectiveQueryPlanner = "PLANNER"
// DirectiveWorkloadName specifies the name of the client application workload issuing the query.
DirectiveWorkloadName = "WORKLOAD_NAME"
// DirectiveCriticality specifies the criticality of a workload. It should be an integer between 0 and 100, where
// 100 is the highest criticality, and 0 is the lowest one.
DirectiveCriticality = "CRITICALITY"
// DirectivePriority specifies the priority of a workload. It should be an integer between 0 and 100, where
// 100 is the highest priority, and 0 is the lowest one.
DirectivePriority = "PRIORITY"

// MaxCriticalityValue specifies the maximum value allowed for criticality. Valid criticality values are
// between zero and MaxCriticalityValue.
MaxCriticalityValue = 100
// MaxPriorityValue specifies the maximum value allowed for priority. Valid priority values are
// between zero and MaxPriorityValue.
MaxPriorityValue = 100
)

var ErrInvalidCriticality = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Invalid criticality value specified in query")
var ErrInvalidPriority = vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "Invalid priority value specified in query")

func isNonSpace(r rune) bool {
return !unicode.IsSpace(r)
Expand Down Expand Up @@ -384,27 +384,27 @@ func CommentsForStatement(stmt Statement) Comments {
return nil
}

// GetCriticalityFromStatement gets the criticality from the provided Statement, using DirectiveCriticality
func GetCriticalityFromStatement(statement Statement) (string, error) {
// GetPriorityFromStatement gets the priority from the provided Statement, using DirectivePriority
func GetPriorityFromStatement(statement Statement) (string, error) {
commentedStatement, ok := statement.(Commented)
// This would mean that the statement lacks comments, so we can't obtain the workload from it. Hence default to
// empty criticality
// empty priority
if !ok {
return "", nil
}

directives := commentedStatement.GetParsedComments().Directives()
criticality := directives.GetString(DirectiveCriticality, "")
if !ok || criticality == "" {
priority := directives.GetString(DirectivePriority, "")
if priority == "" {
return "", nil
}

intCriticality, err := strconv.Atoi(criticality)
if err != nil || intCriticality < 0 || intCriticality > MaxCriticalityValue {
return "", ErrInvalidCriticality
intPriority, err := strconv.Atoi(priority)
if err != nil || intPriority < 0 || intPriority > MaxPriorityValue {
return "", ErrInvalidPriority
}

return criticality, nil
return priority, nil
}

// GetWorkloadNameFromStatement gets the workload name from the provided Statement, using workloadLabel as the name of
Expand Down
60 changes: 30 additions & 30 deletions go/vt/sqlparser/comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,51 +480,51 @@ func TestIgnoreMaxMaxMemoryRowsDirective(t *testing.T) {
}
}

func TestGetCriticalityFromStatement(t *testing.T) {
func TestGetPriorityFromStatement(t *testing.T) {
testCases := []struct {
query string
expectedCriticality string
expectedError error
query string
expectedPriority string
expectedError error
}{
{
query: "select * from a_table",
expectedCriticality: "",
expectedError: nil,
query: "select * from a_table",
expectedPriority: "",
expectedError: nil,
},
{
query: "select /*vt+ ANOTHER_DIRECTIVE=324 */ * from another_table",
expectedCriticality: "",
expectedError: nil,
query: "select /*vt+ ANOTHER_DIRECTIVE=324 */ * from another_table",
expectedPriority: "",
expectedError: nil,
},
{
query: "select /*vt+ CRITICALITY=33 */ * from another_table",
expectedCriticality: "33",
expectedError: nil,
query: "select /*vt+ PRIORITY=33 */ * from another_table",
expectedPriority: "33",
expectedError: nil,
},
{
query: "select /*vt+ CRITICALITY=200 */ * from another_table",
expectedCriticality: "",
expectedError: ErrInvalidCriticality,
query: "select /*vt+ PRIORITY=200 */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ CRITICALITY=-1 */ * from another_table",
expectedCriticality: "",
expectedError: ErrInvalidCriticality,
query: "select /*vt+ PRIORITY=-1 */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ CRITICALITY=some_text */ * from another_table",
expectedCriticality: "",
expectedError: ErrInvalidCriticality,
query: "select /*vt+ PRIORITY=some_text */ * from another_table",
expectedPriority: "",
expectedError: ErrInvalidPriority,
},
{
query: "select /*vt+ CRITICALITY=0 */ * from another_table",
expectedCriticality: "0",
expectedError: nil,
query: "select /*vt+ PRIORITY=0 */ * from another_table",
expectedPriority: "0",
expectedError: nil,
},
{
query: "select /*vt+ CRITICALITY=100 */ * from another_table",
expectedCriticality: "100",
expectedError: nil,
query: "select /*vt+ PRIORITY=100 */ * from another_table",
expectedPriority: "100",
expectedError: nil,
},
}

Expand All @@ -534,12 +534,12 @@ func TestGetCriticalityFromStatement(t *testing.T) {
t.Parallel()
stmt, err := Parse(theThestCase.query)
assert.NoError(t, err)
actualCriticality, actualError := GetCriticalityFromStatement(stmt)
actualPriority, actualError := GetPriorityFromStatement(stmt)
if theThestCase.expectedError != nil {
assert.ErrorIs(t, actualError, theThestCase.expectedError)
} else {
assert.NoError(t, err)
assert.Equal(t, theThestCase.expectedCriticality, actualCriticality)
assert.Equal(t, theThestCase.expectedPriority, actualPriority)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (t *noopVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion) {
panic("implement me")
}

func (t *noopVCursor) SetCriticality(string) {
func (t *noopVCursor) SetPriority(string) {
panic("implement me")
}

Expand Down Expand Up @@ -710,7 +710,7 @@ func (f *loggingVCursor) SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion
panic("implement me")
}

func (f *loggingVCursor) SetCriticality(string) {
func (f *loggingVCursor) SetPriority(string) {
panic("implement me")
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/primitive.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type (
SetWorkload(querypb.ExecuteOptions_Workload)
SetPlannerVersion(querypb.ExecuteOptions_PlannerVersion)
SetWorkloadName(string)
SetCriticality(string)
SetPriority(string)
SetFoundRows(uint64)

SetDDLStrategy(string)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,11 +973,11 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
}
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
criticality, err := sqlparser.GetCriticalityFromStatement(stmt)
priority, err := sqlparser.GetPriorityFromStatement(stmt)
if err != nil {
return nil, err
}
vcursor.SetCriticality(criticality)
vcursor.SetPriority(priority)
vcursor.SetWorkloadName(sqlparser.GetWorkloadNameFromStatement(stmt))

setVarComment, err := prepareSetVarComment(vcursor, stmt)
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,16 +1810,16 @@ func TestGetPlanNormalized(t *testing.T) {
assertCacheContains(t, r, want)
}

func TestGetPlanCriticalityCriticality(t *testing.T) {
func TestGetPlanPriority(t *testing.T) {
testCases := []struct {
name string
sql string
expectedCriticality string
expectedError error
name string
sql string
expectedPriority string
expectedError error
}{
{name: "empty criticality", sql: "select * from music_user_map", expectedCriticality: "", expectedError: nil},
{name: "Invalid criticality", sql: "select /*vt+ CRITICALITY=something */ * from music_user_map", expectedCriticality: "", expectedError: sqlparser.ErrInvalidCriticality},
{name: "Valid criticality", sql: "select /*vt+ CRITICALITY=33 */ * from music_user_map", expectedCriticality: "33", expectedError: nil},
{name: "empty priority", sql: "select * from music_user_map", expectedPriority: "", expectedError: nil},
{name: "Invalid priority", sql: "select /*vt+ PRIORITY=something */ * from music_user_map", expectedPriority: "", expectedError: sqlparser.ErrInvalidPriority},
{name: "Valid priority", sql: "select /*vt+ PRIORITY=33 */ * from music_user_map", expectedPriority: "33", expectedError: nil},
}

for _, aTestCase := range testCases {
Expand All @@ -1832,17 +1832,17 @@ func TestGetPlanCriticalityCriticality(t *testing.T) {
vCursor, err := newVCursorImpl(context.Background(), NewSafeSession(&vtgatepb.Session{TargetString: "@unknown", Options: &querypb.ExecuteOptions{}}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
assert.NoError(t, err)

stmt1, err := sqlparser.Parse(testCase.sql)
stmt, err := sqlparser.Parse(testCase.sql)
assert.NoError(t, err)
crticalityFromStatement, _ := sqlparser.GetCriticalityFromStatement(stmt1)
crticalityFromStatement, _ := sqlparser.GetPriorityFromStatement(stmt)

_, err = r.getPlan(vCursor, testCase.sql, makeComments("/* some comment */"), map[string]*querypb.BindVariable{}, &SafeSession{Session: &vtgatepb.Session{Options: &querypb.ExecuteOptions{}}}, logStats)
if testCase.expectedError != nil {
assert.ErrorIs(t, err, testCase.expectedError)
} else {
assert.NoError(t, err)
assert.Equal(t, testCase.expectedCriticality, crticalityFromStatement)
assert.Equal(t, testCase.expectedCriticality, vCursor.safeSession.Options.Criticality)
assert.Equal(t, testCase.expectedPriority, crticalityFromStatement)
assert.Equal(t, testCase.expectedPriority, vCursor.safeSession.Options.Priority)
}
})
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,9 @@ func (vc *vcursorImpl) SetPlannerVersion(v plancontext.PlannerVersion) {
vc.safeSession.GetOrCreateOptions().PlannerVersion = v
}

func (vc *vcursorImpl) SetCriticality(criticality string) {
if criticality != "" {
vc.safeSession.GetOrCreateOptions().Criticality = criticality
func (vc *vcursorImpl) SetPriority(priority string) {
if priority != "" {
vc.safeSession.GetOrCreateOptions().Priority = priority
}
}

Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func init() {
flagutil.DualFormatBoolVar(&currentConfig.EnableTxThrottler, "enable_tx_throttler", defaultConfig.EnableTxThrottler, "If true replication-lag-based throttling on transactions will be enabled.")
flagutil.DualFormatStringVar(&currentConfig.TxThrottlerConfig, "tx_throttler_config", defaultConfig.TxThrottlerConfig, "The configuration of the transaction throttler as a text formatted throttlerdata.Configuration protocol buffer message")
flagutil.DualFormatStringListVar(&currentConfig.TxThrottlerHealthCheckCells, "tx_throttler_healthcheck_cells", defaultConfig.TxThrottlerHealthCheckCells, "A comma-separated list of cells. Only tabletservers running in these cells will be monitored for replication lag by the transaction throttler.")
flag.IntVar(&currentConfig.TxThrottlerDefaultCriticality, "tx-throttler-default-criticality", defaultConfig.TxThrottlerDefaultCriticality, "Default criticality assigned to queries that lack criticality information.")
flag.IntVar(&currentConfig.TxThrottlerDefaultPriority, "tx-throttler-default-priority", defaultConfig.TxThrottlerDefaultPriority, "Default priority assigned to queries that lack priority information.")

flag.BoolVar(&enableHotRowProtection, "enable_hot_row_protection", false, "If true, incoming transactions for the same row (range) will be queued and cannot consume all txpool slots.")
flag.BoolVar(&enableHotRowProtectionDryRun, "enable_hot_row_protection_dry_run", false, "If true, hot row protection is not enforced but logs if transactions would have been queued.")
Expand Down Expand Up @@ -289,10 +289,10 @@ type TabletConfig struct {
TwoPCCoordinatorAddress string `json:"-"`
TwoPCAbandonAge Seconds `json:"-"`

EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultCriticality int `json:"-"`
EnableTxThrottler bool `json:"-"`
TxThrottlerConfig string `json:"-"`
TxThrottlerHealthCheckCells []string `json:"-"`
TxThrottlerDefaultPriority int `json:"-"`

EnableLagThrottler bool `json:"-"`

Expand Down Expand Up @@ -502,10 +502,10 @@ var defaultConfig = TabletConfig{
CacheResultFields: true,
SignalWhenSchemaChange: true,

EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultCriticality: 0, // This leads to all queries being candidates to throttle
EnableTxThrottler: false,
TxThrottlerConfig: defaultTxThrottlerConfig(),
TxThrottlerHealthCheckCells: []string{},
TxThrottlerDefaultPriority: 0, // This leads to all queries being candidates to throttle

EnableLagThrottler: false, // Feature flag; to switch to 'true' at some stage in the future

Expand Down
19 changes: 13 additions & 6 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,14 +487,21 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, preQ
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
criticality := tsv.config.TxThrottlerDefaultCriticality
if options != nil && options.Criticality != "" {
optionsCriticality, err := strconv.Atoi(options.Criticality)
if err == nil {
criticality = optionsCriticality
priority := tsv.config.TxThrottlerDefaultPriority
if options != nil && options.Priority != "" {
optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)
} else {
priority = optionsPriority
}
}
if tsv.txThrottler.Throttle(criticality) {
if tsv.txThrottler.Throttle(priority) {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
}
var beginSQL string
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,17 @@ func (t *TxThrottler) Close() {
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *TxThrottler) Throttle(criticality int) (result bool) {
func (t *TxThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}

// Throttle according to both what the throttle state says, and the criticality. Workloads with higher criticality
// Throttle according to both what the throttle state says, and the priority. Workloads with higher priority
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxCriticalityValue) > criticality
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) > priority
t.requestsTotal.Add(1)
if result {
t.requestsThrottled.Add(1)
Expand Down
Loading