Skip to content

Commit

Permalink
slack-15.0: backport required Transaction Throttler PRs, pt. 4 (#371)
Browse files Browse the repository at this point in the history
* Skip recalculating the rate in MaxReplicationLagModule when it can't be done (vitessio#12620)

* Skip recalculating the rate in MaxReplicationLagModule when it can't be done

This defends against lag records with nil stats which can lead to segfaults.
See vitessio#12619

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Address PR comments.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Make linter happy

Signed-off-by: Eduardo J. Ortega U <[email protected]>

---------

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* Throttled transactions return MySQL error code 1041 ER_OUT_OF_RESOURCES (vitessio#12949)

This error code seems better suited to represent the fact that transactions are
being throttled by the server due to some form of resource contention than the
current code 1203 ER_TOO_MANY_USER_CONNECTIONS.

Signed-off-by: Eduardo J. Ortega U <[email protected]>

* MaxReplicationLagModule.recalculateRate no longer fills the log (vitessio#14875)

Signed-off-by: Eduardo J. Ortega U <[email protected]>

---------

Signed-off-by: Eduardo J. Ortega U <[email protected]>
Co-authored-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
timvaillancourt and ejortegau authored May 28, 2024
1 parent bb8b2be commit a3c6f97
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 3 deletions.
9 changes: 8 additions & 1 deletion go/mysql/sql_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"regexp"
"strconv"
"strings"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -135,7 +136,11 @@ func mapToSQLErrorFromErrorCode(err error, msg string) *SQLError {
ss = SSAccessDeniedError
case vtrpcpb.Code_RESOURCE_EXHAUSTED:
num = demuxResourceExhaustedErrors(err.Error())
ss = SSClientError
// 1041 ER_OUT_OF_RESOURCES has SQLSTATE HYOOO as per https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_out_of_resources,
// so don't override it here in that case.
if num != EROutOfResources {
ss = SSClientError
}
case vtrpcpb.Code_UNIMPLEMENTED:
num = ERNotSupportedYet
ss = SSClientError
Expand Down Expand Up @@ -223,6 +228,8 @@ func demuxResourceExhaustedErrors(msg string) int {
switch {
case isGRPCOverflowRE.Match([]byte(msg)):
return ERNetPacketTooLarge
case strings.Contains(msg, "Transaction throttled"):
return EROutOfResources
default:
return ERTooManyUserConnections
}
Expand Down
8 changes: 7 additions & 1 deletion go/mysql/sql_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestDumuxResourceExhaustedErrors(t *testing.T) {
func TestDemuxResourceExhaustedErrors(t *testing.T) {
type testCase struct {
msg string
want int
Expand All @@ -42,6 +42,7 @@ func TestDumuxResourceExhaustedErrors(t *testing.T) {
// This should be explicitly handled by returning ERNetPacketTooLarge from the execturo directly
// and therefore shouldn't need to be teased out of another error.
{"in-memory row count exceeded allowed limit of 13", ERTooManyUserConnections},
{"rpc error: code = ResourceExhausted desc = Transaction throttled", EROutOfResources},
}

for _, c := range cases {
Expand Down Expand Up @@ -151,6 +152,11 @@ func TestNewSQLErrorFromError(t *testing.T) {
num: ERNoDb,
ss: SSNoDB,
},
{
err: vterrors.Errorf(vtrpc.Code_RESOURCE_EXHAUSTED, "vttablet: rpc error: code = ResourceExhausted desc = Transaction throttled"),
num: EROutOfResources,
ss: SSUnknownSQLState,
},
}

for _, tc := range tCases {
Expand Down
7 changes: 6 additions & 1 deletion go/vt/throttler/max_replication_lag_module.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,12 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec
if lagRecordNow.isZero() {
panic("rate recalculation was triggered with a zero replication lag record")
}

// Protect against nil stats
if lagRecordNow.Stats == nil {
return
}

now := lagRecordNow.time
lagNow := lagRecordNow.lag()

Expand Down Expand Up @@ -375,7 +381,6 @@ logResult:
r.Reason += clearReason
}

log.Infof("%v", r)
m.results.add(r)
}

Expand Down
49 changes: 49 additions & 0 deletions go/vt/throttler/max_replication_lag_module_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/discovery"
Expand Down Expand Up @@ -83,6 +85,12 @@ func (tf *testFixture) process(lagRecord replicationLagRecord) {
tf.m.processRecord(lagRecord)
}

// recalculateRate does the same thing as MaxReplicationLagModule.recalculateRate() does
// for a new "lagRecord".
func (tf *testFixture) recalculateRate(lagRecord replicationLagRecord) {
tf.m.recalculateRate(lagRecord)
}

func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.Time) error {
if got, want := tf.m.currentState, state; got != want {
return fmt.Errorf("module in wrong state. got = %v, want = %v", got, want)
Expand All @@ -96,6 +104,47 @@ func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.T
return nil
}

func TestNewMaxReplicationLagModule_recalculateRate(t *testing.T) {
testCases := []struct {
name string
lagRecord replicationLagRecord
expectPanic bool
}{
{
name: "Zero lag",
lagRecord: replicationLagRecord{
time: time.Time{},
TabletHealth: discovery.TabletHealth{Stats: nil},
},
expectPanic: true,
},
{
name: "nil lag record stats",
lagRecord: replicationLagRecord{
time: time.Now(),
TabletHealth: discovery.TabletHealth{Stats: nil},
},
expectPanic: false,
},
}

for _, aTestCase := range testCases {
theCase := aTestCase

t.Run(theCase.name, func(t *testing.T) {
t.Parallel()

fixture, err := newTestFixtureWithMaxReplicationLag(5)
assert.NoError(t, err)

if theCase.expectPanic {
assert.Panics(t, func() { fixture.recalculateRate(theCase.lagRecord) })
}
},
)
}
}

func TestMaxReplicationLagModule_RateNotZeroWhenDisabled(t *testing.T) {
tf, err := newTestFixtureWithMaxReplicationLag(ReplicationLagModuleDisabled)
if err != nil {
Expand Down

0 comments on commit a3c6f97

Please sign in to comment.