From a3c6f973e2c87497635da6c24aec00f664920054 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Tue, 28 May 2024 20:32:07 +0200 Subject: [PATCH] `slack-15.0`: backport required Transaction Throttler PRs, pt. 4 (#371) * Skip recalculating the rate in MaxReplicationLagModule when it can't be done (#12620) * Skip recalculating the rate in MaxReplicationLagModule when it can't be done This defends against lag records with nil stats which can lead to segfaults. See https://github.com/vitessio/vitess/issues/12619 Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Address PR comments. Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Make linter happy Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --------- Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Throttled transactions return MySQL error code 1041 ER_OUT_OF_RESOURCES (#12949) This error code seems better suited to represent the fact that transactions are being throttled by the server due to some form of resource contention than the current code 1203 ER_TOO_MANY_USER_CONNECTIONS. Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * MaxReplicationLagModule.recalculateRate no longer fills the log (#14875) Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --------- Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> Co-authored-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --- go/mysql/sql_error.go | 9 +++- go/mysql/sql_error_test.go | 8 ++- go/vt/throttler/max_replication_lag_module.go | 7 ++- .../max_replication_lag_module_test.go | 49 +++++++++++++++++++ 4 files changed, 70 insertions(+), 3 deletions(-) diff --git a/go/mysql/sql_error.go b/go/mysql/sql_error.go index 22cd2c2ae9e..347c1abcdad 100644 --- a/go/mysql/sql_error.go +++ b/go/mysql/sql_error.go @@ -21,6 +21,7 @@ import ( "fmt" "regexp" "strconv" + "strings" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" @@ -135,7 +136,11 @@ func mapToSQLErrorFromErrorCode(err error, msg string) *SQLError { ss = SSAccessDeniedError case vtrpcpb.Code_RESOURCE_EXHAUSTED: num = demuxResourceExhaustedErrors(err.Error()) - ss = SSClientError + // 1041 ER_OUT_OF_RESOURCES has SQLSTATE HYOOO as per https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html#error_er_out_of_resources, + // so don't override it here in that case. + if num != EROutOfResources { + ss = SSClientError + } case vtrpcpb.Code_UNIMPLEMENTED: num = ERNotSupportedYet ss = SSClientError @@ -223,6 +228,8 @@ func demuxResourceExhaustedErrors(msg string) int { switch { case isGRPCOverflowRE.Match([]byte(msg)): return ERNetPacketTooLarge + case strings.Contains(msg, "Transaction throttled"): + return EROutOfResources default: return ERTooManyUserConnections } diff --git a/go/mysql/sql_error_test.go b/go/mysql/sql_error_test.go index c6fe2f65251..e3b6edf47cc 100644 --- a/go/mysql/sql_error_test.go +++ b/go/mysql/sql_error_test.go @@ -25,7 +25,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestDumuxResourceExhaustedErrors(t *testing.T) { +func TestDemuxResourceExhaustedErrors(t *testing.T) { type testCase struct { msg string want int @@ -42,6 +42,7 @@ func TestDumuxResourceExhaustedErrors(t *testing.T) { // This should be explicitly handled by returning ERNetPacketTooLarge from the execturo directly // and therefore shouldn't need to be teased out of another error. {"in-memory row count exceeded allowed limit of 13", ERTooManyUserConnections}, + {"rpc error: code = ResourceExhausted desc = Transaction throttled", EROutOfResources}, } for _, c := range cases { @@ -151,6 +152,11 @@ func TestNewSQLErrorFromError(t *testing.T) { num: ERNoDb, ss: SSNoDB, }, + { + err: vterrors.Errorf(vtrpc.Code_RESOURCE_EXHAUSTED, "vttablet: rpc error: code = ResourceExhausted desc = Transaction throttled"), + num: EROutOfResources, + ss: SSUnknownSQLState, + }, } for _, tc := range tCases { diff --git a/go/vt/throttler/max_replication_lag_module.go b/go/vt/throttler/max_replication_lag_module.go index f8037f7f975..e1a76f89c57 100644 --- a/go/vt/throttler/max_replication_lag_module.go +++ b/go/vt/throttler/max_replication_lag_module.go @@ -301,6 +301,12 @@ func (m *MaxReplicationLagModule) recalculateRate(lagRecordNow replicationLagRec if lagRecordNow.isZero() { panic("rate recalculation was triggered with a zero replication lag record") } + + // Protect against nil stats + if lagRecordNow.Stats == nil { + return + } + now := lagRecordNow.time lagNow := lagRecordNow.lag() @@ -375,7 +381,6 @@ logResult: r.Reason += clearReason } - log.Infof("%v", r) m.results.add(r) } diff --git a/go/vt/throttler/max_replication_lag_module_test.go b/go/vt/throttler/max_replication_lag_module_test.go index f0324df192c..6379b067412 100644 --- a/go/vt/throttler/max_replication_lag_module_test.go +++ b/go/vt/throttler/max_replication_lag_module_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/discovery" @@ -83,6 +85,12 @@ func (tf *testFixture) process(lagRecord replicationLagRecord) { tf.m.processRecord(lagRecord) } +// recalculateRate does the same thing as MaxReplicationLagModule.recalculateRate() does +// for a new "lagRecord". +func (tf *testFixture) recalculateRate(lagRecord replicationLagRecord) { + tf.m.recalculateRate(lagRecord) +} + func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.Time) error { if got, want := tf.m.currentState, state; got != want { return fmt.Errorf("module in wrong state. got = %v, want = %v", got, want) @@ -96,6 +104,47 @@ func (tf *testFixture) checkState(state state, rate int64, lastRateChange time.T return nil } +func TestNewMaxReplicationLagModule_recalculateRate(t *testing.T) { + testCases := []struct { + name string + lagRecord replicationLagRecord + expectPanic bool + }{ + { + name: "Zero lag", + lagRecord: replicationLagRecord{ + time: time.Time{}, + TabletHealth: discovery.TabletHealth{Stats: nil}, + }, + expectPanic: true, + }, + { + name: "nil lag record stats", + lagRecord: replicationLagRecord{ + time: time.Now(), + TabletHealth: discovery.TabletHealth{Stats: nil}, + }, + expectPanic: false, + }, + } + + for _, aTestCase := range testCases { + theCase := aTestCase + + t.Run(theCase.name, func(t *testing.T) { + t.Parallel() + + fixture, err := newTestFixtureWithMaxReplicationLag(5) + assert.NoError(t, err) + + if theCase.expectPanic { + assert.Panics(t, func() { fixture.recalculateRate(theCase.lagRecord) }) + } + }, + ) + } +} + func TestMaxReplicationLagModule_RateNotZeroWhenDisabled(t *testing.T) { tf, err := newTestFixtureWithMaxReplicationLag(ReplicationLagModuleDisabled) if err != nil {