From b8ceeb0a1ac83173d73f4aef4f8dbcb96f93a1e1 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 28 Feb 2024 10:36:11 -0800 Subject: [PATCH 01/10] #5217: added metrics keys for db disconnect event counters --- go.mod | 1 + go.sum | 3 + support/db/metrics.go | 142 +++++++++++++++++++------ support/db/session.go | 6 +- support/db/session_test.go | 210 +++++++++++++++++++++++++++++++++++-- 5 files changed, 322 insertions(+), 40 deletions(-) diff --git a/go.mod b/go.mod index 1ca1ea801c..ff66f5585e 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( firebase.google.com/go v3.12.0+incompatible github.com/2opremio/pretty v0.2.2-0.20230601220618-e1d5758b2a95 github.com/BurntSushi/toml v1.3.2 + github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Masterminds/squirrel v1.5.4 github.com/Microsoft/go-winio v0.6.1 github.com/adjust/goautoneg v0.0.0-20150426214442-d788f35a0315 diff --git a/go.sum b/go.sum index 6e4158f9b7..89e96c64a0 100644 --- a/go.sum +++ b/go.sum @@ -58,6 +58,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= @@ -278,6 +280,7 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= diff --git a/support/db/metrics.go b/support/db/metrics.go index 0726a85f91..226d71588c 100644 --- a/support/db/metrics.go +++ b/support/db/metrics.go @@ -8,6 +8,7 @@ import ( "time" "github.com/Masterminds/squirrel" + "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" ) @@ -44,20 +45,23 @@ func contextRoute(ctx context.Context) string { type SessionWithMetrics struct { SessionInterface - registry *prometheus.Registry - queryCounter *prometheus.CounterVec - queryDurationSummary *prometheus.SummaryVec - maxOpenConnectionsGauge prometheus.GaugeFunc - openConnectionsGauge prometheus.GaugeFunc - inUseConnectionsGauge prometheus.GaugeFunc - idleConnectionsGauge prometheus.GaugeFunc - waitCountCounter prometheus.CounterFunc - waitDurationCounter prometheus.CounterFunc - maxIdleClosedCounter prometheus.CounterFunc - maxIdleTimeClosedCounter prometheus.CounterFunc - maxLifetimeClosedCounter prometheus.CounterFunc - roundTripProbe *roundTripProbe - roundTripTimeSummary prometheus.Summary + registry *prometheus.Registry + queryCounter *prometheus.CounterVec + queryDurationSummary *prometheus.SummaryVec + maxOpenConnectionsGauge prometheus.GaugeFunc + openConnectionsGauge prometheus.GaugeFunc + inUseConnectionsGauge prometheus.GaugeFunc + idleConnectionsGauge prometheus.GaugeFunc + waitCountCounter prometheus.CounterFunc + waitDurationCounter prometheus.CounterFunc + maxIdleClosedCounter prometheus.CounterFunc + maxIdleTimeClosedCounter prometheus.CounterFunc + maxLifetimeClosedCounter prometheus.CounterFunc + roundTripProbe *roundTripProbe + roundTripTimeSummary prometheus.Summary + closedFromClientDisconnect prometheus.Counter + closedFromServerTimeout prometheus.Counter + closedFromStatementTimeout prometheus.Counter } func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *prometheus.Registry) SessionInterface { @@ -226,6 +230,40 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry * ) registry.MustRegister(s.maxLifetimeClosedCounter) + s.closedFromClientDisconnect = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "db", + Name: "client_closed_session_total", + Help: "total number of connections closed due to an upstream caller cancelling the context, " + + "this captures instances where http clients close their connection before receiving a response.", + ConstLabels: prometheus.Labels{"subservice": string(sub)}, + }, + ) + registry.MustRegister(s.closedFromClientDisconnect) + + s.closedFromServerTimeout = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "db", + Name: "server_timeout_closed_session_total", + Help: "total number of connections closed due to the sql taking longer than horizon CONNECTION_TIMEOUT", + ConstLabels: prometheus.Labels{"subservice": string(sub)}, + }, + ) + registry.MustRegister(s.closedFromServerTimeout) + + s.closedFromStatementTimeout = prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "db", + Name: "statement_timeout_closed_session_total", + Help: "total number of connections closed due to the sql statement taking longer than db statement_timeout, which horizon sets based on CONNECTION_TIMEOUT", + ConstLabels: prometheus.Labels{"subservice": string(sub)}, + }, + ) + registry.MustRegister(s.closedFromStatementTimeout) + s.roundTripTimeSummary = prometheus.NewSummary( prometheus.SummaryOpts{ Namespace: namespace, @@ -262,14 +300,27 @@ func (s *SessionWithMetrics) Close() error { s.registry.Unregister(s.maxIdleClosedCounter) s.registry.Unregister(s.maxIdleTimeClosedCounter) s.registry.Unregister(s.maxLifetimeClosedCounter) + s.registry.Unregister(s.closedFromClientDisconnect) + s.registry.Unregister(s.closedFromServerTimeout) + s.registry.Unregister(s.closedFromStatementTimeout) return s.SessionInterface.Close() } -// TODO: Implement these -// func (s *SessionWithMetrics) BeginTx(ctx context.Context, opts *sql.TxOptions) error { -// func (s *SessionWithMetrics) Begin(ctx context.Context) error { -// func (s *SessionWithMetrics) Commit(ctx context.Context) error -// func (s *SessionWithMetrics) Rollback(ctx context.Context) error +func (s *SessionWithMetrics) BeginTx(ctx context.Context, opts *sql.TxOptions) error { + return s.applyDisconnectMetrics(s.SessionInterface.BeginTx(ctx, opts)) +} + +func (s *SessionWithMetrics) Begin(ctx context.Context) error { + return s.applyDisconnectMetrics(s.SessionInterface.Begin(ctx)) +} + +func (s *SessionWithMetrics) Commit() error { + return s.applyDisconnectMetrics(s.SessionInterface.Commit()) +} + +func (s *SessionWithMetrics) Rollback() error { + return s.applyDisconnectMetrics(s.SessionInterface.Rollback()) +} func (s *SessionWithMetrics) TruncateTables(ctx context.Context, tables []string) (err error) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { @@ -305,15 +356,18 @@ func (s *SessionWithMetrics) Clone() SessionInterface { queryDurationSummary: s.queryDurationSummary, // txnCounter: s.txnCounter, // txnDurationSummary: s.txnDurationSummary, - maxOpenConnectionsGauge: s.maxOpenConnectionsGauge, - openConnectionsGauge: s.openConnectionsGauge, - inUseConnectionsGauge: s.inUseConnectionsGauge, - idleConnectionsGauge: s.idleConnectionsGauge, - waitCountCounter: s.waitCountCounter, - waitDurationCounter: s.waitDurationCounter, - maxIdleClosedCounter: s.maxIdleClosedCounter, - maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter, - maxLifetimeClosedCounter: s.maxLifetimeClosedCounter, + maxOpenConnectionsGauge: s.maxOpenConnectionsGauge, + openConnectionsGauge: s.openConnectionsGauge, + inUseConnectionsGauge: s.inUseConnectionsGauge, + idleConnectionsGauge: s.idleConnectionsGauge, + waitCountCounter: s.waitCountCounter, + waitDurationCounter: s.waitDurationCounter, + maxIdleClosedCounter: s.maxIdleClosedCounter, + maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter, + maxLifetimeClosedCounter: s.maxLifetimeClosedCounter, + closedFromClientDisconnect: s.closedFromClientDisconnect, + closedFromServerTimeout: s.closedFromServerTimeout, + closedFromStatementTimeout: s.closedFromStatementTimeout, } } @@ -356,6 +410,23 @@ func getQueryType(ctx context.Context, query squirrel.Sqlizer) QueryType { return UndefinedQueryType } +func (s *SessionWithMetrics) applyDisconnectMetrics(err error) error { + if err == nil { + return nil + } + + switch err { + case ErrCancelled: + s.closedFromClientDisconnect.Inc() + case ErrStatementTimeout: + s.closedFromStatementTimeout.Inc() + case ErrTimeout: + s.closedFromServerTimeout.Inc() + } + + return err +} + func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query squirrel.Sqlizer) (err error) { queryType := string(getQueryType(ctx, query)) timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { @@ -375,7 +446,7 @@ func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query sq }() err = s.SessionInterface.Get(ctx, dest, query) - return err + return s.applyDisconnectMetrics(err) } func (s *SessionWithMetrics) GetRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) { @@ -401,13 +472,22 @@ func (s *SessionWithMetrics) Select(ctx context.Context, dest interface{}, query }() err = s.SessionInterface.Select(ctx, dest, query) - return err + return s.applyDisconnectMetrics(err) } func (s *SessionWithMetrics) SelectRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) { return s.Select(ctx, dest, squirrel.Expr(query, args...)) } +func (s *SessionWithMetrics) Query(ctx context.Context, query squirrel.Sqlizer) (*sqlx.Rows, error) { + result, err := s.SessionInterface.Query(ctx, query) + return result, s.applyDisconnectMetrics(err) +} + +func (s *SessionWithMetrics) QueryRaw(ctx context.Context, query string, args ...interface{}) (*sqlx.Rows, error) { + return s.Query(ctx, squirrel.Expr(query, args...)) +} + func (s *SessionWithMetrics) Exec(ctx context.Context, query squirrel.Sqlizer) (result sql.Result, err error) { queryType := string(getQueryType(ctx, query)) timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { @@ -427,7 +507,7 @@ func (s *SessionWithMetrics) Exec(ctx context.Context, query squirrel.Sqlizer) ( }() result, err = s.SessionInterface.Exec(ctx, query) - return result, err + return result, s.applyDisconnectMetrics(err) } func (s *SessionWithMetrics) ExecRaw(ctx context.Context, query string, args ...interface{}) (result sql.Result, err error) { diff --git a/support/db/session.go b/support/db/session.go index 4ad0bc86b5..faddab5dc8 100644 --- a/support/db/session.go +++ b/support/db/session.go @@ -243,10 +243,12 @@ func (s *Session) replaceWithKnownError(err error, ctx context.Context) error { case ctx.Err() == context.Canceled: return ErrCancelled case ctx.Err() == context.DeadlineExceeded: - // if libpq waits too long to obtain conn from pool, can get ctx timeout before server trip + // when horizon's context times out(it's set to app connection-timeout), it triggers pg to emit "pq: canceling statement due to user request" + // so, in order of precedence, check the ctx deadline first to classify this as a timeout, not a cancel return ErrTimeout case strings.Contains(err.Error(), "pq: canceling statement due to user request"): - return ErrTimeout + // this is from an external initiated cancel signal sent to pg on a running sql query + return ErrCancelled case strings.Contains(err.Error(), "pq: canceling statement due to conflict with recovery"): return ErrConflictWithRecovery case strings.Contains(err.Error(), "driver: bad connection"): diff --git a/support/db/session_test.go b/support/db/session_test.go index 8629b2ca7e..a048c381db 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -2,15 +2,22 @@ package db import ( "context" + "fmt" + "sync" "testing" "time" + "github.com/DATA-DOG/go-sqlmock" + "github.com/jmoiron/sqlx" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/db/dbtest" + "github.com/stellar/go/support/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestServerTimeout(t *testing.T) { +func TestContextTimeoutDuringSql(t *testing.T) { db := dbtest.Postgres(t).Load(testSchema) defer db.Close() @@ -19,32 +26,221 @@ func TestServerTimeout(t *testing.T) { ctx, cancel = context.WithTimeout(ctx, time.Duration(1)) assert := assert.New(t) - sess := &Session{DB: db.Open()} - defer sess.DB.Close() + sessRaw := &Session{DB: db.Open()} + reg := prometheus.NewRegistry() + + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() defer cancel() var count int err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(2), COUNT(*) FROM people") assert.ErrorIs(err, ErrTimeout, "long running db server operation past context timeout, should return timeout") + + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_server_timeout_closed_session_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) + return + } + } + assert.Fail("server_timeout_closed_session_total metrics were not correct") } -func TestUserCancel(t *testing.T) { +func TestContextTimeoutBeforeSql(t *testing.T) { db := dbtest.Postgres(t).Load(testSchema) defer db.Close() + var cancel context.CancelFunc + ctx := context.Background() + ctx, cancel = context.WithTimeout(ctx, time.Second) + assert := assert.New(t) + + sessRaw := &Session{DB: db.Open()} + reg := prometheus.NewRegistry() + + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() + defer cancel() + + var count int + time.Sleep(time.Second) + err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(5) FROM people") + assert.ErrorIs(err, ErrTimeout, "any db server operation should return error immediately if context already timed out") + + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_server_timeout_closed_session_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) + return + } + } + assert.Fail("server_timeout_closed_session_total metrics were not correct") +} + +func TestExternallyCancelledSql(t *testing.T) { var cancel context.CancelFunc ctx := context.Background() ctx, cancel = context.WithCancel(ctx) assert := assert.New(t) - sess := &Session{DB: db.Open()} - defer sess.DB.Close() + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer mockDB.Close() + sqlxDB := sqlx.NewDb(mockDB, "sqlmock") + + sessRaw := &Session{DB: sqlxDB} + reg := prometheus.NewRegistry() + + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + + defer sess.Close() + defer cancel() + + // simulate an external(not originating from horizon) cancelled sql statement, such as cancelled from psql usage + mock.ExpectQuery("SELECT 1").WillReturnError(errors.New("pq: canceling statement due to user request")) + + var count int + err = sess.GetRaw(ctx, &count, "SELECT 1") + assert.ErrorIs(err, ErrCancelled, "externally cancelled sql statement, should return cancelled error") + + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_client_closed_session_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) + return + } + } + assert.Fail("client_closed_session_total metrics were not correct") +} + +func TestSqlStatementTimeout(t *testing.T) { + var cancel context.CancelFunc + ctx := context.Background() + ctx, cancel = context.WithCancel(ctx) + assert := assert.New(t) + + mockDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) + } + defer mockDB.Close() + sqlxDB := sqlx.NewDb(mockDB, "sqlmock") + + sessRaw := &Session{DB: sqlxDB} + reg := prometheus.NewRegistry() + + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + + defer sess.Close() + defer cancel() + + // simulate pg statement timeout + mock.ExpectQuery("SELECT 1").WillReturnError(errors.New("pq: canceling statement due to statement timeout")) + + var count int + err = sess.GetRaw(ctx, &count, "SELECT 1") + assert.ErrorIs(err, ErrStatementTimeout, "sql statement timeout, should return timeout error") + + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_statement_timeout_closed_session_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) + return + } + } + assert.Fail("statement_timeout_closed_session_total metrics were not correct") +} + +func TestContextCancelledBeforeSql(t *testing.T) { + db := dbtest.Postgres(t).Load(testSchema) + defer db.Close() + + var cancel context.CancelFunc + ctx := context.Background() + ctx, cancel = context.WithCancel(ctx) + assert := assert.New(t) + + sessRaw := &Session{DB: db.Open()} + reg := prometheus.NewRegistry() + + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() defer cancel() var count int cancel() err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(2), COUNT(*) FROM people") - assert.ErrorIs(err, ErrCancelled, "any ongoing db server operation should return error immediately after user cancel") + assert.ErrorIs(err, ErrCancelled, "any db server operation should return error immediately if user already cancel") + + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_client_closed_session_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) + return + } + } + assert.Fail("client_closed_session_total metrics were not correct") +} + +func TestContextCancelDuringSql(t *testing.T) { + db := dbtest.Postgres(t).Load(testSchema) + defer db.Close() + + var cancel context.CancelFunc + ctx := context.Background() + ctx, cancel = context.WithCancel(ctx) + assert := assert.New(t) + + sessRaw := &Session{DB: db.Open()} + reg := prometheus.NewRegistry() + + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() + defer cancel() + + var count int + var wg sync.WaitGroup + wg.Add(1) + + go func() { + err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(5) FROM people") + assert.ErrorIs(err, ErrCancelled, "any ongoing db server operation should return error immediately after user cancel") + wg.Done() + }() + time.Sleep(time.Second) + cancel() + + require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) + + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_client_closed_session_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) + return + } + } + assert.Fail("client_closed_session_total metrics were not correct") } func TestSession(t *testing.T) { From 8f17c48f84462d590c23750fe14ad502b546a346 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 28 Feb 2024 10:57:51 -0800 Subject: [PATCH 02/10] #5217: fix govet warning --- support/db/session_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/support/db/session_test.go b/support/db/session_test.go index a048c381db..b756dde837 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -2,7 +2,6 @@ package db import ( "context" - "fmt" "sync" "testing" "time" @@ -10,7 +9,6 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" - "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/db/dbtest" "github.com/stellar/go/support/errors" "github.com/stretchr/testify/assert" From 2a247769b319fbd339426aeff88568e8b3d0cdac Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 28 Feb 2024 23:43:26 -0800 Subject: [PATCH 03/10] #5217: consolidate to one metrics key and labels for discriminators of error types --- go.mod | 1 - go.sum | 3 - support/db/main.go | 8 +- support/db/metrics.go | 174 +++++++++++------------- support/db/session.go | 59 ++++++--- support/db/session_test.go | 262 +++++++++++++++---------------------- 6 files changed, 230 insertions(+), 277 deletions(-) diff --git a/go.mod b/go.mod index c6a1efc83b..0e55add07c 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,6 @@ require ( firebase.google.com/go v3.12.0+incompatible github.com/2opremio/pretty v0.2.2-0.20230601220618-e1d5758b2a95 github.com/BurntSushi/toml v1.3.2 - github.com/DATA-DOG/go-sqlmock v1.5.2 github.com/Masterminds/squirrel v1.5.4 github.com/Microsoft/go-winio v0.6.1 github.com/adjust/goautoneg v0.0.0-20150426214442-d788f35a0315 diff --git a/go.sum b/go.sum index 0904cb1090..1e31cfdb08 100644 --- a/go.sum +++ b/go.sum @@ -58,8 +58,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= -github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow= @@ -284,7 +282,6 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/karrick/godirwalk v1.16.1 h1:DynhcF+bztK8gooS0+NDJFrdNZjJ3gzVzC545UNA9iw= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/kisielk/sqlstruct v0.0.0-20201105191214-5f3e10d3ab46/go.mod h1:yyMNCyc/Ib3bDTKd379tNMpB/7/H5TjM2Y9QJ5THLbE= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.10/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= diff --git a/support/db/main.go b/support/db/main.go index 2fb1f18a10..bb16083362 100644 --- a/support/db/main.go +++ b/support/db/main.go @@ -118,10 +118,14 @@ type Session struct { // DB is the database connection that queries should be executed against. DB *sqlx.DB - tx *sqlx.Tx - txOptions *sql.TxOptions + tx *sqlx.Tx + txOptions *sql.TxOptions + errorHandlers []ErrorHandlerFunc } +// dbErr - the Postgres error +// callerContext - the caller's context +type ErrorHandlerFunc func(dbErr error, callerContext context.Context) type SessionInterface interface { BeginTx(ctx context.Context, opts *sql.TxOptions) error Begin(ctx context.Context) error diff --git a/support/db/metrics.go b/support/db/metrics.go index 226d71588c..65e915524b 100644 --- a/support/db/metrics.go +++ b/support/db/metrics.go @@ -9,6 +9,7 @@ import ( "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" + "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" ) @@ -45,23 +46,21 @@ func contextRoute(ctx context.Context) string { type SessionWithMetrics struct { SessionInterface - registry *prometheus.Registry - queryCounter *prometheus.CounterVec - queryDurationSummary *prometheus.SummaryVec - maxOpenConnectionsGauge prometheus.GaugeFunc - openConnectionsGauge prometheus.GaugeFunc - inUseConnectionsGauge prometheus.GaugeFunc - idleConnectionsGauge prometheus.GaugeFunc - waitCountCounter prometheus.CounterFunc - waitDurationCounter prometheus.CounterFunc - maxIdleClosedCounter prometheus.CounterFunc - maxIdleTimeClosedCounter prometheus.CounterFunc - maxLifetimeClosedCounter prometheus.CounterFunc - roundTripProbe *roundTripProbe - roundTripTimeSummary prometheus.Summary - closedFromClientDisconnect prometheus.Counter - closedFromServerTimeout prometheus.Counter - closedFromStatementTimeout prometheus.Counter + registry *prometheus.Registry + queryCounter *prometheus.CounterVec + queryDurationSummary *prometheus.SummaryVec + maxOpenConnectionsGauge prometheus.GaugeFunc + openConnectionsGauge prometheus.GaugeFunc + inUseConnectionsGauge prometheus.GaugeFunc + idleConnectionsGauge prometheus.GaugeFunc + waitCountCounter prometheus.CounterFunc + waitDurationCounter prometheus.CounterFunc + maxIdleClosedCounter prometheus.CounterFunc + maxIdleTimeClosedCounter prometheus.CounterFunc + maxLifetimeClosedCounter prometheus.CounterFunc + roundTripProbe *roundTripProbe + roundTripTimeSummary prometheus.Summary + abendCounter *prometheus.CounterVec } func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *prometheus.Registry) SessionInterface { @@ -70,6 +69,8 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry * registry: registry, } + base.AddErrorHandler(s.handleErrorEvent) + s.queryCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, @@ -230,39 +231,17 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry * ) registry.MustRegister(s.maxLifetimeClosedCounter) - s.closedFromClientDisconnect = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: "db", - Name: "client_closed_session_total", - Help: "total number of connections closed due to an upstream caller cancelling the context, " + - "this captures instances where http clients close their connection before receiving a response.", - ConstLabels: prometheus.Labels{"subservice": string(sub)}, - }, - ) - registry.MustRegister(s.closedFromClientDisconnect) - - s.closedFromServerTimeout = prometheus.NewCounter( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: "db", - Name: "server_timeout_closed_session_total", - Help: "total number of connections closed due to the sql taking longer than horizon CONNECTION_TIMEOUT", - ConstLabels: prometheus.Labels{"subservice": string(sub)}, - }, - ) - registry.MustRegister(s.closedFromServerTimeout) - - s.closedFromStatementTimeout = prometheus.NewCounter( + s.abendCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: "db", - Name: "statement_timeout_closed_session_total", - Help: "total number of connections closed due to the sql statement taking longer than db statement_timeout, which horizon sets based on CONNECTION_TIMEOUT", + Name: "abend_total", + Help: "total number of abends, details are captured in labels", ConstLabels: prometheus.Labels{"subservice": string(sub)}, }, + []string{"origin", "condition", "type"}, ) - registry.MustRegister(s.closedFromStatementTimeout) + registry.MustRegister(s.abendCounter) s.roundTripTimeSummary = prometheus.NewSummary( prometheus.SummaryOpts{ @@ -300,28 +279,10 @@ func (s *SessionWithMetrics) Close() error { s.registry.Unregister(s.maxIdleClosedCounter) s.registry.Unregister(s.maxIdleTimeClosedCounter) s.registry.Unregister(s.maxLifetimeClosedCounter) - s.registry.Unregister(s.closedFromClientDisconnect) - s.registry.Unregister(s.closedFromServerTimeout) - s.registry.Unregister(s.closedFromStatementTimeout) + s.registry.Unregister(s.abendCounter) return s.SessionInterface.Close() } -func (s *SessionWithMetrics) BeginTx(ctx context.Context, opts *sql.TxOptions) error { - return s.applyDisconnectMetrics(s.SessionInterface.BeginTx(ctx, opts)) -} - -func (s *SessionWithMetrics) Begin(ctx context.Context) error { - return s.applyDisconnectMetrics(s.SessionInterface.Begin(ctx)) -} - -func (s *SessionWithMetrics) Commit() error { - return s.applyDisconnectMetrics(s.SessionInterface.Commit()) -} - -func (s *SessionWithMetrics) Rollback() error { - return s.applyDisconnectMetrics(s.SessionInterface.Rollback()) -} - func (s *SessionWithMetrics) TruncateTables(ctx context.Context, tables []string) (err error) { timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { s.queryDurationSummary.With(prometheus.Labels{ @@ -356,18 +317,16 @@ func (s *SessionWithMetrics) Clone() SessionInterface { queryDurationSummary: s.queryDurationSummary, // txnCounter: s.txnCounter, // txnDurationSummary: s.txnDurationSummary, - maxOpenConnectionsGauge: s.maxOpenConnectionsGauge, - openConnectionsGauge: s.openConnectionsGauge, - inUseConnectionsGauge: s.inUseConnectionsGauge, - idleConnectionsGauge: s.idleConnectionsGauge, - waitCountCounter: s.waitCountCounter, - waitDurationCounter: s.waitDurationCounter, - maxIdleClosedCounter: s.maxIdleClosedCounter, - maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter, - maxLifetimeClosedCounter: s.maxLifetimeClosedCounter, - closedFromClientDisconnect: s.closedFromClientDisconnect, - closedFromServerTimeout: s.closedFromServerTimeout, - closedFromStatementTimeout: s.closedFromStatementTimeout, + maxOpenConnectionsGauge: s.maxOpenConnectionsGauge, + openConnectionsGauge: s.openConnectionsGauge, + inUseConnectionsGauge: s.inUseConnectionsGauge, + idleConnectionsGauge: s.idleConnectionsGauge, + waitCountCounter: s.waitCountCounter, + waitDurationCounter: s.waitDurationCounter, + maxIdleClosedCounter: s.maxIdleClosedCounter, + maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter, + maxLifetimeClosedCounter: s.maxLifetimeClosedCounter, + abendCounter: s.abendCounter, } } @@ -410,21 +369,52 @@ func getQueryType(ctx context.Context, query squirrel.Sqlizer) QueryType { return UndefinedQueryType } -func (s *SessionWithMetrics) applyDisconnectMetrics(err error) error { - if err == nil { - return nil +func (s *SessionWithMetrics) handleErrorEvent(dbErr error, callerContext context.Context) { + if dbErr == nil || s.NoRows(dbErr) { + return } - switch err { - case ErrCancelled: - s.closedFromClientDisconnect.Inc() - case ErrStatementTimeout: - s.closedFromStatementTimeout.Inc() - case ErrTimeout: - s.closedFromServerTimeout.Inc() + abendOrigin := "libpq" + abendType := "error" + abendCondition := "" + var abendDbErrorCode pq.ErrorCode + + if err, ok := dbErr.(*pq.Error); ok { + // libpq only provides the pg.Error if the context was clear of errors + abendDbErrorCode = err.Code + abendOrigin = "db" + abendCondition = abendDbErrorCode.Name() } - return err + switch { + // check the context first, if was canceled or timed out, then the libpq will only return the + // context error, no pg.Error, and no further condition + case callerContext.Err() == context.Canceled: + abendOrigin = "client_context" + abendType = "cancel" + case callerContext.Err() == context.DeadlineExceeded: + abendOrigin = "horizon_context" + abendType = "timeout" + default: + // since context is clear, check for any specfic overrides based on pg.Error from the db + switch abendDbErrorCode { + case "57014": + // https://www.postgresql.org/docs/12/errcodes-appendix.html + // 57014, query_canceled + // since the only functional cancels come from horizon context, which are + // trapped above, we are left with just statement timeouts as the reason for this. + abendType = "timeout" + case "": + // if here, context is clear and there's no pg.Error either, just use error string as-is + abendCondition = dbErr.Error() + } + } + + s.abendCounter.With(prometheus.Labels{ + "origin": abendOrigin, + "condition": abendCondition, + "type": abendType, + }).Inc() } func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query squirrel.Sqlizer) (err error) { @@ -445,8 +435,7 @@ func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query sq }).Inc() }() - err = s.SessionInterface.Get(ctx, dest, query) - return s.applyDisconnectMetrics(err) + return s.SessionInterface.Get(ctx, dest, query) } func (s *SessionWithMetrics) GetRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) { @@ -471,8 +460,7 @@ func (s *SessionWithMetrics) Select(ctx context.Context, dest interface{}, query }).Inc() }() - err = s.SessionInterface.Select(ctx, dest, query) - return s.applyDisconnectMetrics(err) + return s.SessionInterface.Select(ctx, dest, query) } func (s *SessionWithMetrics) SelectRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) { @@ -480,8 +468,7 @@ func (s *SessionWithMetrics) SelectRaw(ctx context.Context, dest interface{}, qu } func (s *SessionWithMetrics) Query(ctx context.Context, query squirrel.Sqlizer) (*sqlx.Rows, error) { - result, err := s.SessionInterface.Query(ctx, query) - return result, s.applyDisconnectMetrics(err) + return s.SessionInterface.Query(ctx, query) } func (s *SessionWithMetrics) QueryRaw(ctx context.Context, query string, args ...interface{}) (*sqlx.Rows, error) { @@ -506,8 +493,7 @@ func (s *SessionWithMetrics) Exec(ctx context.Context, query squirrel.Sqlizer) ( }).Inc() }() - result, err = s.SessionInterface.Exec(ctx, query) - return result, s.applyDisconnectMetrics(err) + return s.SessionInterface.Exec(ctx, query) } func (s *SessionWithMetrics) ExecRaw(ctx context.Context, query string, args ...interface{}) (result sql.Result, err error) { diff --git a/support/db/session.go b/support/db/session.go index faddab5dc8..484fea6dce 100644 --- a/support/db/session.go +++ b/support/db/session.go @@ -10,6 +10,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" + "github.com/lib/pq" "github.com/stellar/go/support/db/sqlutils" "github.com/stellar/go/support/errors" "github.com/stellar/go/support/log" @@ -23,7 +24,7 @@ func (s *Session) Begin(ctx context.Context) error { tx, err := s.DB.BeginTxx(ctx, nil) if err != nil { - if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil { + if knownErr := s.handleError(err, ctx); knownErr != nil { return knownErr } @@ -44,7 +45,7 @@ func (s *Session) BeginTx(ctx context.Context, opts *sql.TxOptions) error { tx, err := s.DB.BeginTxx(ctx, opts) if err != nil { - if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil { + if knownErr := s.handleError(err, ctx); knownErr != nil { return knownErr } @@ -92,7 +93,7 @@ func (s *Session) Commit() error { s.tx = nil s.txOptions = nil - if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil { + if knownErr := s.handleError(err, context.Background()); knownErr != nil { return knownErr } return err @@ -146,7 +147,7 @@ func (s *Session) GetRaw(ctx context.Context, dest interface{}, query string, ar return nil } - if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil { + if knownErr := s.handleError(err, ctx); knownErr != nil { return knownErr } @@ -215,7 +216,7 @@ func (s *Session) ExecRaw(ctx context.Context, query string, args ...interface{} return result, nil } - if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil { + if knownErr := s.handleError(err, ctx); knownErr != nil { return nil, knownErr } @@ -232,13 +233,33 @@ func (s *Session) NoRows(err error) bool { return err == sql.ErrNoRows } -// replaceWithKnownError tries to replace Postgres error with package error. -// Returns a new error if the err is known. -func (s *Session) replaceWithKnownError(err error, ctx context.Context) error { - if err == nil { +func (s *Session) AddErrorHandler(handler ErrorHandlerFunc) { + s.errorHandlers = append(s.errorHandlers, handler) +} + +// handleError does housekeeping on errors from db. +// dbErr - the libpq client error +// ctx - the caller's context +// +// tries to replace dbErr with horizon package error, returns a new error if the err is known. +// invokes any additional error handlers that may have been +// added to the session, passing the caller's context +func (s *Session) handleError(dbErr error, ctx context.Context) error { + if dbErr == nil { return nil } + for _, handler := range s.errorHandlers { + handler(dbErr, ctx) + } + + var abendDbErrorCode pq.ErrorCode + + // libpq will only return pg server error if context did not cancel/timeout + if err, ok := dbErr.(*pq.Error); ok { + abendDbErrorCode = err.Code + } + switch { case ctx.Err() == context.Canceled: return ErrCancelled @@ -246,16 +267,14 @@ func (s *Session) replaceWithKnownError(err error, ctx context.Context) error { // when horizon's context times out(it's set to app connection-timeout), it triggers pg to emit "pq: canceling statement due to user request" // so, in order of precedence, check the ctx deadline first to classify this as a timeout, not a cancel return ErrTimeout - case strings.Contains(err.Error(), "pq: canceling statement due to user request"): - // this is from an external initiated cancel signal sent to pg on a running sql query - return ErrCancelled - case strings.Contains(err.Error(), "pq: canceling statement due to conflict with recovery"): + case abendDbErrorCode.Name() == "query_canceled": + // https://www.postgresql.org/docs/12/errcodes-appendix.html, query_canceled + return ErrStatementTimeout + case strings.Contains(dbErr.Error(), "pq: canceling statement due to conflict with recovery"): return ErrConflictWithRecovery - case strings.Contains(err.Error(), "driver: bad connection"): + case strings.Contains(dbErr.Error(), "driver: bad connection"): return ErrBadConnection - case strings.Contains(err.Error(), "pq: canceling statement due to statement timeout"): - return ErrStatementTimeout - case strings.Contains(err.Error(), "transaction has already been committed or rolled back"): + case strings.Contains(dbErr.Error(), "transaction has already been committed or rolled back"): return ErrAlreadyRolledback default: return nil @@ -286,7 +305,7 @@ func (s *Session) QueryRaw(ctx context.Context, query string, args ...interface{ return result, nil } - if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil { + if knownErr := s.handleError(err, ctx); knownErr != nil { return nil, knownErr } @@ -320,7 +339,7 @@ func (s *Session) Rollback() error { s.tx = nil s.txOptions = nil - if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil { + if knownErr := s.handleError(err, context.Background()); knownErr != nil { return knownErr } return err @@ -364,7 +383,7 @@ func (s *Session) SelectRaw( return nil } - if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil { + if knownErr := s.handleError(err, ctx); knownErr != nil { return knownErr } diff --git a/support/db/session_test.go b/support/db/session_test.go index b756dde837..a4f7c9f2a7 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -6,11 +6,8 @@ import ( "testing" "time" - "github.com/DATA-DOG/go-sqlmock" - "github.com/jmoiron/sqlx" "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/support/db/dbtest" - "github.com/stellar/go/support/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -21,7 +18,7 @@ func TestContextTimeoutDuringSql(t *testing.T) { var cancel context.CancelFunc ctx := context.Background() - ctx, cancel = context.WithTimeout(ctx, time.Duration(1)) + ctx, cancel = context.WithTimeout(ctx, 2*time.Second) assert := assert.New(t) sessRaw := &Session{DB: db.Open()} @@ -32,20 +29,18 @@ func TestContextTimeoutDuringSql(t *testing.T) { defer cancel() var count int - err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(2), COUNT(*) FROM people") - assert.ErrorIs(err, ErrTimeout, "long running db server operation past context timeout, should return timeout") + var wg sync.WaitGroup + wg.Add(1) - metrics, err := reg.Gather() - assert.NoError(err) + go func() { + err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(5) FROM people") + assert.ErrorIs(err, ErrTimeout, "long running db server operation past context timeout, should return timeout") + wg.Done() + }() - for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_server_timeout_closed_session_total" { - assert.Len(metricFamily.GetMetric(), 1) - assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) - return - } - } - assert.Fail("server_timeout_closed_session_total metrics were not correct") + require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) + // note, condition is populated with the db error, since a trip to server was made with sql running at time of cancel + assertAbendMetrics(reg, "horizon_context", "query_canceled", "timeout", assert) } func TestContextTimeoutBeforeSql(t *testing.T) { @@ -54,7 +49,7 @@ func TestContextTimeoutBeforeSql(t *testing.T) { var cancel context.CancelFunc ctx := context.Background() - ctx, cancel = context.WithTimeout(ctx, time.Second) + ctx, cancel = context.WithTimeout(ctx, time.Millisecond) assert := assert.New(t) sessRaw := &Session{DB: db.Open()} @@ -65,103 +60,11 @@ func TestContextTimeoutBeforeSql(t *testing.T) { defer cancel() var count int - time.Sleep(time.Second) + time.Sleep(500 * time.Millisecond) err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(5) FROM people") assert.ErrorIs(err, ErrTimeout, "any db server operation should return error immediately if context already timed out") - - metrics, err := reg.Gather() - assert.NoError(err) - - for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_server_timeout_closed_session_total" { - assert.Len(metricFamily.GetMetric(), 1) - assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) - return - } - } - assert.Fail("server_timeout_closed_session_total metrics were not correct") -} - -func TestExternallyCancelledSql(t *testing.T) { - var cancel context.CancelFunc - ctx := context.Background() - ctx, cancel = context.WithCancel(ctx) - assert := assert.New(t) - - mockDB, mock, err := sqlmock.New() - if err != nil { - t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) - } - defer mockDB.Close() - sqlxDB := sqlx.NewDb(mockDB, "sqlmock") - - sessRaw := &Session{DB: sqlxDB} - reg := prometheus.NewRegistry() - - sess := RegisterMetrics(sessRaw, "test", "subtest", reg) - - defer sess.Close() - defer cancel() - - // simulate an external(not originating from horizon) cancelled sql statement, such as cancelled from psql usage - mock.ExpectQuery("SELECT 1").WillReturnError(errors.New("pq: canceling statement due to user request")) - - var count int - err = sess.GetRaw(ctx, &count, "SELECT 1") - assert.ErrorIs(err, ErrCancelled, "externally cancelled sql statement, should return cancelled error") - - metrics, err := reg.Gather() - assert.NoError(err) - - for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_client_closed_session_total" { - assert.Len(metricFamily.GetMetric(), 1) - assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) - return - } - } - assert.Fail("client_closed_session_total metrics were not correct") -} - -func TestSqlStatementTimeout(t *testing.T) { - var cancel context.CancelFunc - ctx := context.Background() - ctx, cancel = context.WithCancel(ctx) - assert := assert.New(t) - - mockDB, mock, err := sqlmock.New() - if err != nil { - t.Fatalf("an error '%s' was not expected when opening a stub database connection", err) - } - defer mockDB.Close() - sqlxDB := sqlx.NewDb(mockDB, "sqlmock") - - sessRaw := &Session{DB: sqlxDB} - reg := prometheus.NewRegistry() - - sess := RegisterMetrics(sessRaw, "test", "subtest", reg) - - defer sess.Close() - defer cancel() - - // simulate pg statement timeout - mock.ExpectQuery("SELECT 1").WillReturnError(errors.New("pq: canceling statement due to statement timeout")) - - var count int - err = sess.GetRaw(ctx, &count, "SELECT 1") - assert.ErrorIs(err, ErrStatementTimeout, "sql statement timeout, should return timeout error") - - metrics, err := reg.Gather() - assert.NoError(err) - - for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_statement_timeout_closed_session_total" { - assert.Len(metricFamily.GetMetric(), 1) - assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) - return - } - } - assert.Fail("statement_timeout_closed_session_total metrics were not correct") + // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx + assertAbendMetrics(reg, "horizon_context", "", "timeout", assert) } func TestContextCancelledBeforeSql(t *testing.T) { @@ -184,18 +87,8 @@ func TestContextCancelledBeforeSql(t *testing.T) { cancel() err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(2), COUNT(*) FROM people") assert.ErrorIs(err, ErrCancelled, "any db server operation should return error immediately if user already cancel") - - metrics, err := reg.Gather() - assert.NoError(err) - - for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_client_closed_session_total" { - assert.Len(metricFamily.GetMetric(), 1) - assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) - return - } - } - assert.Fail("client_closed_session_total metrics were not correct") + // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx + assertAbendMetrics(reg, "client_context", "", "cancel", assert) } func TestContextCancelDuringSql(t *testing.T) { @@ -227,18 +120,26 @@ func TestContextCancelDuringSql(t *testing.T) { cancel() require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) + // note, condition is populated with the db error, since a trip to server was made with sql running at time of cancel + assertAbendMetrics(reg, "client_context", "query_canceled", "cancel", assert) +} - metrics, err := reg.Gather() +func TestStatementTimeout(t *testing.T) { + assert := assert.New(t) + db := dbtest.Postgres(t).Load(testSchema) + defer db.Close() + + sessRaw, err := Open(db.Dialect, db.DSN, StatementTimeout(50*time.Millisecond)) + reg := prometheus.NewRegistry() + + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) assert.NoError(err) + defer sess.Close() - for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_client_closed_session_total" { - assert.Len(metricFamily.GetMetric(), 1) - assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) - return - } - } - assert.Fail("client_closed_session_total metrics were not correct") + var count int + err = sess.GetRaw(context.Background(), &count, "SELECT pg_sleep(2) FROM people") + assert.ErrorIs(err, ErrStatementTimeout) + assertAbendMetrics(reg, "db", "query_canceled", "timeout", assert) } func TestSession(t *testing.T) { @@ -248,10 +149,13 @@ func TestSession(t *testing.T) { ctx := context.Background() assert := assert.New(t) require := require.New(t) - sess := &Session{DB: db.Open()} - defer sess.DB.Close() + sessRaw := &Session{DB: db.Open()} + reg := prometheus.NewRegistry() - assert.Equal("postgres", sess.Dialect()) + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() + + assert.Equal("postgres", sessRaw.Dialect()) var count int err := sess.GetRaw(ctx, &count, "SELECT COUNT(*) FROM people") @@ -318,24 +222,12 @@ func TestSession(t *testing.T) { assert.Len(names, 2) // Test ReplacePlaceholders - out, err := sess.ReplacePlaceholders("? = ? = ? = ??") + out, err := sessRaw.ReplacePlaceholders("? = ? = ? = ??") if assert.NoError(err) { assert.Equal("$1 = $2 = $3 = ?", out) } -} - -func TestStatementTimeout(t *testing.T) { - assert := assert.New(t) - db := dbtest.Postgres(t).Load(testSchema) - defer db.Close() - - sess, err := Open(db.Dialect, db.DSN, StatementTimeout(50*time.Millisecond)) - assert.NoError(err) - defer sess.Close() - var count int - err = sess.GetRaw(context.Background(), &count, "SELECT pg_sleep(2), COUNT(*) FROM people") - assert.ErrorIs(err, ErrStatementTimeout) + assertZeroAbendMetrics(reg, assert) } func TestIdleTransactionTimeout(t *testing.T) { @@ -343,8 +235,11 @@ func TestIdleTransactionTimeout(t *testing.T) { db := dbtest.Postgres(t).Load(testSchema) defer db.Close() - sess, err := Open(db.Dialect, db.DSN, IdleTransactionTimeout(50*time.Millisecond)) + sessRaw, err := Open(db.Dialect, db.DSN, IdleTransactionTimeout(50*time.Millisecond)) assert.NoError(err) + reg := prometheus.NewRegistry() + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() assert.NoError(sess.Begin(context.Background())) @@ -353,26 +248,79 @@ func TestIdleTransactionTimeout(t *testing.T) { var count int err = sess.GetRaw(context.Background(), &count, "SELECT COUNT(*) FROM people") assert.ErrorIs(err, ErrBadConnection) + assertAbendMetrics(reg, "libpq", "driver: bad connection", "error", assert) } func TestSessionRollbackAfterContextCanceled(t *testing.T) { + assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) defer db.Close() - sess := setupRolledbackTx(t, db) - defer sess.DB.Close() + sessRaw := setupRolledbackTx(t, db) + reg := prometheus.NewRegistry() + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() - assert.ErrorIs(t, sess.Rollback(), ErrAlreadyRolledback) + assert.ErrorIs(sess.Rollback(), ErrAlreadyRolledback) + assertAbendMetrics(reg, "libpq", "sql: transaction has already been committed or rolled back", "error", assert) } func TestSessionCommitAfterContextCanceled(t *testing.T) { + assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) defer db.Close() - sess := setupRolledbackTx(t, db) - defer sess.DB.Close() + sessRaw := setupRolledbackTx(t, db) + reg := prometheus.NewRegistry() + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() + + assert.ErrorIs(sess.Commit(), ErrAlreadyRolledback) + assertAbendMetrics(reg, "libpq", "sql: transaction has already been committed or rolled back", "error", assert) +} + +func assertZeroAbendMetrics(reg *prometheus.Registry, assert *assert.Assertions) { + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_abend_total" { + assert.Fail("abend_total metrics should not be present, never incremented") + } + } + +} - assert.ErrorIs(t, sess.Commit(), ErrAlreadyRolledback) +func assertAbendMetrics(reg *prometheus.Registry, assertOrigin, assertCondition, assertType string, assert *assert.Assertions) { + metrics, err := reg.Gather() + assert.NoError(err) + + for _, metricFamily := range metrics { + if metricFamily.GetName() == "test_db_abend_total" { + assert.Len(metricFamily.GetMetric(), 1) + assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) + var origin = "" + var condition = "" + var abend_type = "" + for _, label := range metricFamily.GetMetric()[0].GetLabel() { + if label.GetName() == "origin" { + origin = label.GetValue() + } + if label.GetName() == "condition" { + condition = label.GetValue() + } + if label.GetName() == "type" { + abend_type = label.GetValue() + } + } + + assert.Equal(origin, assertOrigin) + assert.Equal(condition, assertCondition) + assert.Equal(abend_type, assertType) + return + } + } + assert.Fail("abend_total metrics were not correct") } func setupRolledbackTx(t *testing.T, db *dbtest.DB) *Session { From fd3e0722fa1266c0176c804b954548e789293017 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Wed, 28 Feb 2024 23:55:16 -0800 Subject: [PATCH 04/10] #5217: update inline comments on db error metrics --- support/db/metrics.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/support/db/metrics.go b/support/db/metrics.go index 65e915524b..5de11b795f 100644 --- a/support/db/metrics.go +++ b/support/db/metrics.go @@ -380,15 +380,16 @@ func (s *SessionWithMetrics) handleErrorEvent(dbErr error, callerContext context var abendDbErrorCode pq.ErrorCode if err, ok := dbErr.(*pq.Error); ok { - // libpq only provides the pg.Error if the context was clear of errors + // libpq only provides a pg.Error if a server trip was made, otherwise it may not be present + // the error could just be context or libpq err abendDbErrorCode = err.Code abendOrigin = "db" abendCondition = abendDbErrorCode.Name() } switch { - // check the context first, if was canceled or timed out, then the libpq will only return the - // context error, no pg.Error, and no further condition + // check the context first, if was canceled or timed out, + // then it gets precedence on determining the type and origin case callerContext.Err() == context.Canceled: abendOrigin = "client_context" abendType = "cancel" @@ -396,16 +397,16 @@ func (s *SessionWithMetrics) handleErrorEvent(dbErr error, callerContext context abendOrigin = "horizon_context" abendType = "timeout" default: - // since context is clear, check for any specfic overrides based on pg.Error from the db + // since context is clear, check for any specfic overrides for type based on pg.Error from the db switch abendDbErrorCode { case "57014": // https://www.postgresql.org/docs/12/errcodes-appendix.html // 57014, query_canceled - // since the only functional cancels come from horizon context, which are - // trapped above, we are left with just statement timeouts as the reason for this. + // since the only functional cancels come from horizon client's context, which are + // trapped above in context introspection, we are left with just possibility of statement timeouts as the reason for this. abendType = "timeout" case "": - // if here, context is clear and there's no pg.Error either, just use error string as-is + // if here, context is clear and there's no pg.Error either, just use the error string as-is abendCondition = dbErr.Error() } } From 26c3316ab8f1a1acdbc17706821139f22ad8820b Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 29 Feb 2024 11:12:44 -0800 Subject: [PATCH 05/10] #5217: review feedbacks --- support/db/metrics.go | 25 ++++++++++--------------- support/db/session.go | 9 ++++++--- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/support/db/metrics.go b/support/db/metrics.go index 5de11b795f..dc1e85dcf4 100644 --- a/support/db/metrics.go +++ b/support/db/metrics.go @@ -3,12 +3,12 @@ package db import ( "context" "database/sql" + "errors" "fmt" "strings" "time" "github.com/Masterminds/squirrel" - "github.com/jmoiron/sqlx" "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" ) @@ -378,11 +378,12 @@ func (s *SessionWithMetrics) handleErrorEvent(dbErr error, callerContext context abendType := "error" abendCondition := "" var abendDbErrorCode pq.ErrorCode + var pqErr *pq.Error - if err, ok := dbErr.(*pq.Error); ok { + if errors.As(dbErr, &pqErr) { // libpq only provides a pg.Error if a server trip was made, otherwise it may not be present // the error could just be context or libpq err - abendDbErrorCode = err.Code + abendDbErrorCode = pqErr.Code abendOrigin = "db" abendCondition = abendDbErrorCode.Name() } @@ -435,8 +436,8 @@ func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query sq "route": contextRoute(ctx), }).Inc() }() - - return s.SessionInterface.Get(ctx, dest, query) + err = s.SessionInterface.Get(ctx, dest, query) + return err } func (s *SessionWithMetrics) GetRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) { @@ -461,21 +462,14 @@ func (s *SessionWithMetrics) Select(ctx context.Context, dest interface{}, query }).Inc() }() - return s.SessionInterface.Select(ctx, dest, query) + err = s.SessionInterface.Select(ctx, dest, query) + return err } func (s *SessionWithMetrics) SelectRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) (err error) { return s.Select(ctx, dest, squirrel.Expr(query, args...)) } -func (s *SessionWithMetrics) Query(ctx context.Context, query squirrel.Sqlizer) (*sqlx.Rows, error) { - return s.SessionInterface.Query(ctx, query) -} - -func (s *SessionWithMetrics) QueryRaw(ctx context.Context, query string, args ...interface{}) (*sqlx.Rows, error) { - return s.Query(ctx, squirrel.Expr(query, args...)) -} - func (s *SessionWithMetrics) Exec(ctx context.Context, query squirrel.Sqlizer) (result sql.Result, err error) { queryType := string(getQueryType(ctx, query)) timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { @@ -494,7 +488,8 @@ func (s *SessionWithMetrics) Exec(ctx context.Context, query squirrel.Sqlizer) ( }).Inc() }() - return s.SessionInterface.Exec(ctx, query) + result, err = s.SessionInterface.Exec(ctx, query) + return result, err } func (s *SessionWithMetrics) ExecRaw(ctx context.Context, query string, args ...interface{}) (result sql.Result, err error) { diff --git a/support/db/session.go b/support/db/session.go index 484fea6dce..8545860ed0 100644 --- a/support/db/session.go +++ b/support/db/session.go @@ -3,6 +3,7 @@ package db import ( "context" "database/sql" + go_errors "errors" "fmt" "reflect" "strings" @@ -254,10 +255,12 @@ func (s *Session) handleError(dbErr error, ctx context.Context) error { } var abendDbErrorCode pq.ErrorCode + var pqErr *pq.Error - // libpq will only return pg server error if context did not cancel/timeout - if err, ok := dbErr.(*pq.Error); ok { - abendDbErrorCode = err.Code + // libpq will only wrap a pg server error if context was not cancel/timeout first + // and was able to send request to server + if go_errors.As(dbErr, &pqErr) { + abendDbErrorCode = pqErr.Code } switch { From 8aabea605b1de3555723c8dea77a406a971b3fd1 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 29 Feb 2024 14:47:38 -0800 Subject: [PATCH 06/10] #5217: check pg error first, ctx second, don't store err string on metrics mapping --- support/db/main.go | 4 ++-- support/db/metrics.go | 44 +++++++++++++++++--------------------- support/db/session.go | 33 +++++++++++++++++----------- support/db/session_test.go | 25 +++++++++++----------- 4 files changed, 56 insertions(+), 50 deletions(-) diff --git a/support/db/main.go b/support/db/main.go index bb16083362..dca23526ee 100644 --- a/support/db/main.go +++ b/support/db/main.go @@ -124,8 +124,8 @@ type Session struct { } // dbErr - the Postgres error -// callerContext - the caller's context -type ErrorHandlerFunc func(dbErr error, callerContext context.Context) +// ctx - the caller's context +type ErrorHandlerFunc func(dbErr error, ctx context.Context) type SessionInterface interface { BeginTx(ctx context.Context, opts *sql.TxOptions) error Begin(ctx context.Context) error diff --git a/support/db/metrics.go b/support/db/metrics.go index dc1e85dcf4..47f17b8b62 100644 --- a/support/db/metrics.go +++ b/support/db/metrics.go @@ -369,47 +369,43 @@ func getQueryType(ctx context.Context, query squirrel.Sqlizer) QueryType { return UndefinedQueryType } -func (s *SessionWithMetrics) handleErrorEvent(dbErr error, callerContext context.Context) { +// derive the db 'abend_total' metric from the err returned by libpq sdk +// +// dbErr - the error returned by any libpq method call +// ctx - the caller's context used on libpb method call +func (s *SessionWithMetrics) handleErrorEvent(dbErr error, ctx context.Context) { if dbErr == nil || s.NoRows(dbErr) { return } + // default the metric to based just on top level libpq error abendOrigin := "libpq" abendType := "error" - abendCondition := "" - var abendDbErrorCode pq.ErrorCode + abendCondition := "n/a" + var pgDbErrorCode string var pqErr *pq.Error + // apply db server error info if it exists + // libpq only provides a pg.Error if a server trip was made, otherwise it may not be present if errors.As(dbErr, &pqErr) { - // libpq only provides a pg.Error if a server trip was made, otherwise it may not be present - // the error could just be context or libpq err - abendDbErrorCode = pqErr.Code + pgDbErrorCode = string(pqErr.Code) abendOrigin = "db" - abendCondition = abendDbErrorCode.Name() + abendCondition = pgDbErrorCode } + // apply remaining overrides to metric, when these specific points exist switch { - // check the context first, if was canceled or timed out, - // then it gets precedence on determining the type and origin - case callerContext.Err() == context.Canceled: + case errors.Is(ctx.Err(), context.Canceled): abendOrigin = "client_context" abendType = "cancel" - case callerContext.Err() == context.DeadlineExceeded: + case errors.Is(ctx.Err(), context.DeadlineExceeded): abendOrigin = "horizon_context" abendType = "timeout" - default: - // since context is clear, check for any specfic overrides for type based on pg.Error from the db - switch abendDbErrorCode { - case "57014": - // https://www.postgresql.org/docs/12/errcodes-appendix.html - // 57014, query_canceled - // since the only functional cancels come from horizon client's context, which are - // trapped above in context introspection, we are left with just possibility of statement timeouts as the reason for this. - abendType = "timeout" - case "": - // if here, context is clear and there's no pg.Error either, just use the error string as-is - abendCondition = dbErr.Error() - } + case pgDbErrorCode == "57014": + // if getting here, no context deadline happened, but + // the db reported query_canceled, which leaves only the possibility of + // db-side statement timeout was triggered + abendType = "timeout" } s.abendCounter.With(prometheus.Labels{ diff --git a/support/db/session.go b/support/db/session.go index 8545860ed0..02ff266ff4 100644 --- a/support/db/session.go +++ b/support/db/session.go @@ -240,7 +240,7 @@ func (s *Session) AddErrorHandler(handler ErrorHandlerFunc) { // handleError does housekeeping on errors from db. // dbErr - the libpq client error -// ctx - the caller's context +// ctx - the calling context // // tries to replace dbErr with horizon package error, returns a new error if the err is known. // invokes any additional error handlers that may have been @@ -257,28 +257,37 @@ func (s *Session) handleError(dbErr error, ctx context.Context) error { var abendDbErrorCode pq.ErrorCode var pqErr *pq.Error - // libpq will only wrap a pg server error if context was not cancel/timeout first - // and was able to send request to server + // if libpql sends to server, and then any server side error is reported, + // libpq passes back only an pq.ErrorCode from method call + // even if the caller context generates a cancel/deadline error during the server trip, + // libpq will only return an instance of pq.ErrorCode as a non-wrapped error if go_errors.As(dbErr, &pqErr) { abendDbErrorCode = pqErr.Code } switch { - case ctx.Err() == context.Canceled: - return ErrCancelled - case ctx.Err() == context.DeadlineExceeded: - // when horizon's context times out(it's set to app connection-timeout), it triggers pg to emit "pq: canceling statement due to user request" - // so, in order of precedence, check the ctx deadline first to classify this as a timeout, not a cancel - return ErrTimeout - case abendDbErrorCode.Name() == "query_canceled": - // https://www.postgresql.org/docs/12/errcodes-appendix.html, query_canceled - return ErrStatementTimeout case strings.Contains(dbErr.Error(), "pq: canceling statement due to conflict with recovery"): return ErrConflictWithRecovery case strings.Contains(dbErr.Error(), "driver: bad connection"): return ErrBadConnection case strings.Contains(dbErr.Error(), "transaction has already been committed or rolled back"): return ErrAlreadyRolledback + case go_errors.Is(ctx.Err(), context.Canceled): + // when horizon's context is cancelled by it's upstream api client, + // it will propagate to here and libpq will emit a wrapped err that has the cancel err + return ErrCancelled + case go_errors.Is(ctx.Err(), context.DeadlineExceeded): + // when horizon's context times out(it's set to app connection-timeout), + // it will trigger libpq to emit a wrapped err that has the deadline err + return ErrTimeout + case abendDbErrorCode == "57014": + // https://www.postgresql.org/docs/12/errcodes-appendix.html, query_canceled + // this code can be generated for multiple cases, + // by libpq sending a signal to server when it experiences a context cancel/deadline + // or it could happen based on just server statement_timeout setting + // since we check the context cancel/deadline err state first, getting here means + // this can only be from a statement timeout + return ErrStatementTimeout default: return nil } diff --git a/support/db/session_test.go b/support/db/session_test.go index a4f7c9f2a7..606038dffe 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -39,8 +39,8 @@ func TestContextTimeoutDuringSql(t *testing.T) { }() require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) - // note, condition is populated with the db error, since a trip to server was made with sql running at time of cancel - assertAbendMetrics(reg, "horizon_context", "query_canceled", "timeout", assert) + // note, condition is populated with the db error, since a trip to server was made with sql running at time of deadline exceeded + assertAbendMetrics(reg, "horizon_context", "57014", "timeout", assert) } func TestContextTimeoutBeforeSql(t *testing.T) { @@ -63,8 +63,8 @@ func TestContextTimeoutBeforeSql(t *testing.T) { time.Sleep(500 * time.Millisecond) err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(5) FROM people") assert.ErrorIs(err, ErrTimeout, "any db server operation should return error immediately if context already timed out") - // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx - assertAbendMetrics(reg, "horizon_context", "", "timeout", assert) + // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx already deadlined + assertAbendMetrics(reg, "horizon_context", "n/a", "timeout", assert) } func TestContextCancelledBeforeSql(t *testing.T) { @@ -87,8 +87,8 @@ func TestContextCancelledBeforeSql(t *testing.T) { cancel() err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(2), COUNT(*) FROM people") assert.ErrorIs(err, ErrCancelled, "any db server operation should return error immediately if user already cancel") - // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx - assertAbendMetrics(reg, "client_context", "", "cancel", assert) + // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx already canceled + assertAbendMetrics(reg, "client_context", "n/a", "cancel", assert) } func TestContextCancelDuringSql(t *testing.T) { @@ -120,8 +120,8 @@ func TestContextCancelDuringSql(t *testing.T) { cancel() require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) - // note, condition is populated with the db error, since a trip to server was made with sql running at time of cancel - assertAbendMetrics(reg, "client_context", "query_canceled", "cancel", assert) + // note, condition is populated with the db error, since a trip to server was made with sql running at time of ctx cancel + assertAbendMetrics(reg, "client_context", "57014", "cancel", assert) } func TestStatementTimeout(t *testing.T) { @@ -139,7 +139,8 @@ func TestStatementTimeout(t *testing.T) { var count int err = sess.GetRaw(context.Background(), &count, "SELECT pg_sleep(2) FROM people") assert.ErrorIs(err, ErrStatementTimeout) - assertAbendMetrics(reg, "db", "query_canceled", "timeout", assert) + // if the metric is source=db and condition=57014, then it's a statement timeout on the server + assertAbendMetrics(reg, "db", "57014", "timeout", assert) } func TestSession(t *testing.T) { @@ -248,7 +249,7 @@ func TestIdleTransactionTimeout(t *testing.T) { var count int err = sess.GetRaw(context.Background(), &count, "SELECT COUNT(*) FROM people") assert.ErrorIs(err, ErrBadConnection) - assertAbendMetrics(reg, "libpq", "driver: bad connection", "error", assert) + assertAbendMetrics(reg, "libpq", "n/a", "error", assert) } func TestSessionRollbackAfterContextCanceled(t *testing.T) { @@ -262,7 +263,7 @@ func TestSessionRollbackAfterContextCanceled(t *testing.T) { defer sess.Close() assert.ErrorIs(sess.Rollback(), ErrAlreadyRolledback) - assertAbendMetrics(reg, "libpq", "sql: transaction has already been committed or rolled back", "error", assert) + assertAbendMetrics(reg, "libpq", "n/a", "error", assert) } func TestSessionCommitAfterContextCanceled(t *testing.T) { @@ -276,7 +277,7 @@ func TestSessionCommitAfterContextCanceled(t *testing.T) { defer sess.Close() assert.ErrorIs(sess.Commit(), ErrAlreadyRolledback) - assertAbendMetrics(reg, "libpq", "sql: transaction has already been committed or rolled back", "error", assert) + assertAbendMetrics(reg, "libpq", "n/a", "error", assert) } func assertZeroAbendMetrics(reg *prometheus.Registry, assert *assert.Assertions) { From ae3579f86c846455f24da8903587a2982aacfe2c Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Thu, 29 Feb 2024 16:25:12 -0800 Subject: [PATCH 07/10] #5217: added more test coverage on other pg server error code mapping to metric condition --- support/db/session_test.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/support/db/session_test.go b/support/db/session_test.go index 606038dffe..c99bd205d3 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -6,6 +6,8 @@ import ( "testing" "time" + //"github.com/lib/pq" + "github.com/lib/pq" "github.com/prometheus/client_golang/prometheus" "github.com/stellar/go/support/db/dbtest" "github.com/stretchr/testify/assert" @@ -252,6 +254,25 @@ func TestIdleTransactionTimeout(t *testing.T) { assertAbendMetrics(reg, "libpq", "n/a", "error", assert) } +func TestDbServerErrorInMetrics(t *testing.T) { + assert := assert.New(t) + db := dbtest.Postgres(t).Load(testSchema) + defer db.Close() + + sessRaw := &Session{DB: db.Open()} + reg := prometheus.NewRegistry() + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + + defer sess.Close() + var pqErr *pq.Error + + // generate a server side sql state error + _, err := sess.ExecRaw(context.Background(), "oops, invalid sql") + assert.ErrorAs(err, &pqErr) + // should find the same sql state error in the metric condition label + assertAbendMetrics(reg, "db", "42601", "error", assert) +} + func TestSessionRollbackAfterContextCanceled(t *testing.T) { assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) From 9cf1d45b97e43209afcc3d049777dc5cbe333f78 Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 4 Mar 2024 21:16:32 -0800 Subject: [PATCH 08/10] #5217: changed the metrics model to be ctx_error, db_error, db_error_extra --- support/db/metrics.go | 67 +++++++++++----------- support/db/session.go | 6 +- support/db/session_test.go | 110 ++++++++++++++++++++++--------------- 3 files changed, 102 insertions(+), 81 deletions(-) diff --git a/support/db/metrics.go b/support/db/metrics.go index 47f17b8b62..e36988cb35 100644 --- a/support/db/metrics.go +++ b/support/db/metrics.go @@ -60,7 +60,7 @@ type SessionWithMetrics struct { maxLifetimeClosedCounter prometheus.CounterFunc roundTripProbe *roundTripProbe roundTripTimeSummary prometheus.Summary - abendCounter *prometheus.CounterVec + errorCounter *prometheus.CounterVec } func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *prometheus.Registry) SessionInterface { @@ -231,17 +231,17 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry * ) registry.MustRegister(s.maxLifetimeClosedCounter) - s.abendCounter = prometheus.NewCounterVec( + s.errorCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: namespace, Subsystem: "db", - Name: "abend_total", - Help: "total number of abends, details are captured in labels", + Name: "error_total", + Help: "total number of db related errors, details are captured in labels", ConstLabels: prometheus.Labels{"subservice": string(sub)}, }, - []string{"origin", "condition", "type"}, + []string{"ctx_error", "db_error", "db_error_extra"}, ) - registry.MustRegister(s.abendCounter) + registry.MustRegister(s.errorCounter) s.roundTripTimeSummary = prometheus.NewSummary( prometheus.SummaryOpts{ @@ -279,7 +279,7 @@ func (s *SessionWithMetrics) Close() error { s.registry.Unregister(s.maxIdleClosedCounter) s.registry.Unregister(s.maxIdleTimeClosedCounter) s.registry.Unregister(s.maxLifetimeClosedCounter) - s.registry.Unregister(s.abendCounter) + s.registry.Unregister(s.errorCounter) return s.SessionInterface.Close() } @@ -326,7 +326,7 @@ func (s *SessionWithMetrics) Clone() SessionInterface { maxIdleClosedCounter: s.maxIdleClosedCounter, maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter, maxLifetimeClosedCounter: s.maxLifetimeClosedCounter, - abendCounter: s.abendCounter, + errorCounter: s.errorCounter, } } @@ -378,40 +378,41 @@ func (s *SessionWithMetrics) handleErrorEvent(dbErr error, ctx context.Context) return } - // default the metric to based just on top level libpq error - abendOrigin := "libpq" - abendType := "error" - abendCondition := "n/a" - var pgDbErrorCode string + ctxError := "n/a" + dbError := "n/a" + errorExtra := "n/a" var pqErr *pq.Error - // apply db server error info if it exists - // libpq only provides a pg.Error if a server trip was made, otherwise it may not be present - if errors.As(dbErr, &pqErr) { - pgDbErrorCode = string(pqErr.Code) - abendOrigin = "db" - abendCondition = pgDbErrorCode + switch { + case errors.As(dbErr, &pqErr): + dbError = string(pqErr.Code) + switch pqErr.Message { + case "canceling statement due to user request": + errorExtra = "user_request" + case "canceling statement due to statement timeout": + errorExtra = "statement_timeout" + } + case strings.Contains(dbErr.Error(), "driver: bad connection"): + dbError = "driver_bad_connection" + case strings.Contains(dbErr.Error(), "sql: transaction has already been committed or rolled back"): + dbError = "tx_already_rollback" + case errors.Is(dbErr, context.Canceled): + dbError = "canceled" + case errors.Is(dbErr, context.DeadlineExceeded): + dbError = "deadline_exceeded" } - // apply remaining overrides to metric, when these specific points exist switch { case errors.Is(ctx.Err(), context.Canceled): - abendOrigin = "client_context" - abendType = "cancel" + ctxError = "canceled" case errors.Is(ctx.Err(), context.DeadlineExceeded): - abendOrigin = "horizon_context" - abendType = "timeout" - case pgDbErrorCode == "57014": - // if getting here, no context deadline happened, but - // the db reported query_canceled, which leaves only the possibility of - // db-side statement timeout was triggered - abendType = "timeout" + ctxError = "deadline_exceeded" } - s.abendCounter.With(prometheus.Labels{ - "origin": abendOrigin, - "condition": abendCondition, - "type": abendType, + s.errorCounter.With(prometheus.Labels{ + "ctx_error": ctxError, + "db_error": dbError, + "db_error_extra": errorExtra, }).Inc() } diff --git a/support/db/session.go b/support/db/session.go index 02ff266ff4..472fc40a37 100644 --- a/support/db/session.go +++ b/support/db/session.go @@ -254,7 +254,7 @@ func (s *Session) handleError(dbErr error, ctx context.Context) error { handler(dbErr, ctx) } - var abendDbErrorCode pq.ErrorCode + var dbErrorCode pq.ErrorCode var pqErr *pq.Error // if libpql sends to server, and then any server side error is reported, @@ -262,7 +262,7 @@ func (s *Session) handleError(dbErr error, ctx context.Context) error { // even if the caller context generates a cancel/deadline error during the server trip, // libpq will only return an instance of pq.ErrorCode as a non-wrapped error if go_errors.As(dbErr, &pqErr) { - abendDbErrorCode = pqErr.Code + dbErrorCode = pqErr.Code } switch { @@ -280,7 +280,7 @@ func (s *Session) handleError(dbErr error, ctx context.Context) error { // when horizon's context times out(it's set to app connection-timeout), // it will trigger libpq to emit a wrapped err that has the deadline err return ErrTimeout - case abendDbErrorCode == "57014": + case dbErrorCode == "57014": // https://www.postgresql.org/docs/12/errcodes-appendix.html, query_canceled // this code can be generated for multiple cases, // by libpq sending a signal to server when it experiences a context cancel/deadline diff --git a/support/db/session_test.go b/support/db/session_test.go index c99bd205d3..cb7963cbf5 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -41,8 +41,7 @@ func TestContextTimeoutDuringSql(t *testing.T) { }() require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) - // note, condition is populated with the db error, since a trip to server was made with sql running at time of deadline exceeded - assertAbendMetrics(reg, "horizon_context", "57014", "timeout", assert) + assertDbErrorMetrics(reg, "deadline_exceeded", "57014", "user_request", assert) } func TestContextTimeoutBeforeSql(t *testing.T) { @@ -65,8 +64,7 @@ func TestContextTimeoutBeforeSql(t *testing.T) { time.Sleep(500 * time.Millisecond) err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(5) FROM people") assert.ErrorIs(err, ErrTimeout, "any db server operation should return error immediately if context already timed out") - // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx already deadlined - assertAbendMetrics(reg, "horizon_context", "n/a", "timeout", assert) + assertDbErrorMetrics(reg, "deadline_exceeded", "deadline_exceeded", "n/a", assert) } func TestContextCancelledBeforeSql(t *testing.T) { @@ -89,8 +87,7 @@ func TestContextCancelledBeforeSql(t *testing.T) { cancel() err := sess.GetRaw(ctx, &count, "SELECT pg_sleep(2), COUNT(*) FROM people") assert.ErrorIs(err, ErrCancelled, "any db server operation should return error immediately if user already cancel") - // note, the condition is empty, the sql never made it to db, libpq short-circuited it based on ctx already canceled - assertAbendMetrics(reg, "client_context", "n/a", "cancel", assert) + assertDbErrorMetrics(reg, "canceled", "canceled", "n/a", assert) } func TestContextCancelDuringSql(t *testing.T) { @@ -122,8 +119,7 @@ func TestContextCancelDuringSql(t *testing.T) { cancel() require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) - // note, condition is populated with the db error, since a trip to server was made with sql running at time of ctx cancel - assertAbendMetrics(reg, "client_context", "57014", "cancel", assert) + assertDbErrorMetrics(reg, "canceled", "57014", "user_request", assert) } func TestStatementTimeout(t *testing.T) { @@ -141,8 +137,7 @@ func TestStatementTimeout(t *testing.T) { var count int err = sess.GetRaw(context.Background(), &count, "SELECT pg_sleep(2) FROM people") assert.ErrorIs(err, ErrStatementTimeout) - // if the metric is source=db and condition=57014, then it's a statement timeout on the server - assertAbendMetrics(reg, "db", "57014", "timeout", assert) + assertDbErrorMetrics(reg, "n/a", "57014", "statement_timeout", assert) } func TestSession(t *testing.T) { @@ -230,7 +225,7 @@ func TestSession(t *testing.T) { assert.Equal("$1 = $2 = $3 = ?", out) } - assertZeroAbendMetrics(reg, assert) + assertZeroErrorMetrics(reg, assert) } func TestIdleTransactionTimeout(t *testing.T) { @@ -251,43 +246,66 @@ func TestIdleTransactionTimeout(t *testing.T) { var count int err = sess.GetRaw(context.Background(), &count, "SELECT COUNT(*) FROM people") assert.ErrorIs(err, ErrBadConnection) - assertAbendMetrics(reg, "libpq", "n/a", "error", assert) + assertDbErrorMetrics(reg, "n/a", "driver_bad_connection", "n/a", assert) } -func TestDbServerErrorInMetrics(t *testing.T) { +func TestIdleTransactionTimeoutAndContextTimeout(t *testing.T) { assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) defer db.Close() - sessRaw := &Session{DB: db.Open()} + var cancel context.CancelFunc + ctx := context.Background() + ctx, cancel = context.WithTimeout(ctx, 150*time.Millisecond) + + sessRaw, err := Open(db.Dialect, db.DSN, IdleTransactionTimeout(100*time.Millisecond)) + assert.NoError(err) reg := prometheus.NewRegistry() sess := RegisterMetrics(sessRaw, "test", "subtest", reg) defer sess.Close() - var pqErr *pq.Error + defer cancel() - // generate a server side sql state error - _, err := sess.ExecRaw(context.Background(), "oops, invalid sql") - assert.ErrorAs(err, &pqErr) - // should find the same sql state error in the metric condition label - assertAbendMetrics(reg, "db", "42601", "error", assert) + var wg sync.WaitGroup + wg.Add(1) + assert.NoError(sess.Begin(context.Background())) + + <-time.After(200 * time.Millisecond) + + go func() { + _, err := sess.ExecRaw(ctx, "SELECT pg_sleep(5) FROM people") + assert.ErrorIs(err, ErrTimeout, "long running db server operation past context timeout, should return timeout") + wg.Done() + }() + + require.Eventually(t, func() bool { wg.Wait(); return true }, 5*time.Second, time.Second) + // this demonstrates subtley of libpq error handling: + // first a server session was created + // 100ms elapsed and idle server session was triggered on server side, server sent signal back to libpq, libpq marks the session locally as bad + // 150ms caller ctx deadlined + // now caller invokes libpq and tries to submit a sql statement on the now-closed session with deadlined ctx also + // libpq only reports an error of deadline exceeded it will not emit the driver_bad_connection due to closed server session + assertDbErrorMetrics(reg, "deadline_exceeded", "deadline_exceeded", "n/a", assert) } -func TestSessionRollbackAfterContextCanceled(t *testing.T) { +func TestDbServerErrorInMetrics(t *testing.T) { assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) defer db.Close() - sessRaw := setupRolledbackTx(t, db) + sessRaw := &Session{DB: db.Open()} reg := prometheus.NewRegistry() sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + defer sess.Close() + var pqErr *pq.Error - assert.ErrorIs(sess.Rollback(), ErrAlreadyRolledback) - assertAbendMetrics(reg, "libpq", "n/a", "error", assert) + _, err := sess.ExecRaw(context.Background(), "oops, invalid sql") + assert.ErrorAs(err, &pqErr) + assertDbErrorMetrics(reg, "n/a", "42601", "n/a", assert) } -func TestSessionCommitAfterContextCanceled(t *testing.T) { +func TestSessionAfterRollback(t *testing.T) { assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) defer db.Close() @@ -297,52 +315,54 @@ func TestSessionCommitAfterContextCanceled(t *testing.T) { sess := RegisterMetrics(sessRaw, "test", "subtest", reg) defer sess.Close() - assert.ErrorIs(sess.Commit(), ErrAlreadyRolledback) - assertAbendMetrics(reg, "libpq", "n/a", "error", assert) + var count int + err := sess.GetRaw(context.Background(), &count, "SELECT COUNT(*) FROM people") + assert.ErrorIs(err, ErrAlreadyRolledback) + assertDbErrorMetrics(reg, "n/a", "tx_already_rollback", "n/a", assert) } -func assertZeroAbendMetrics(reg *prometheus.Registry, assert *assert.Assertions) { +func assertZeroErrorMetrics(reg *prometheus.Registry, assert *assert.Assertions) { metrics, err := reg.Gather() assert.NoError(err) for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_abend_total" { - assert.Fail("abend_total metrics should not be present, never incremented") + if metricFamily.GetName() == "test_db_error_total" { + assert.Fail("error_total metrics should not be present, never incremented") } } } -func assertAbendMetrics(reg *prometheus.Registry, assertOrigin, assertCondition, assertType string, assert *assert.Assertions) { +func assertDbErrorMetrics(reg *prometheus.Registry, assertCtxError, assertDbError, assertDbErrorExtra string, assert *assert.Assertions) { metrics, err := reg.Gather() assert.NoError(err) for _, metricFamily := range metrics { - if metricFamily.GetName() == "test_db_abend_total" { + if metricFamily.GetName() == "test_db_error_total" { assert.Len(metricFamily.GetMetric(), 1) assert.Equal(metricFamily.GetMetric()[0].GetCounter().GetValue(), float64(1)) - var origin = "" - var condition = "" - var abend_type = "" + var ctxError = "" + var dbError = "" + var dbErrorExtra = "" for _, label := range metricFamily.GetMetric()[0].GetLabel() { - if label.GetName() == "origin" { - origin = label.GetValue() + if label.GetName() == "ctx_error" { + ctxError = label.GetValue() } - if label.GetName() == "condition" { - condition = label.GetValue() + if label.GetName() == "db_error" { + dbError = label.GetValue() } - if label.GetName() == "type" { - abend_type = label.GetValue() + if label.GetName() == "db_error_extra" { + dbErrorExtra = label.GetValue() } } - assert.Equal(origin, assertOrigin) - assert.Equal(condition, assertCondition) - assert.Equal(abend_type, assertType) + assert.Equal(ctxError, assertCtxError) + assert.Equal(dbError, assertDbError) + assert.Equal(dbErrorExtra, assertDbErrorExtra) return } } - assert.Fail("abend_total metrics were not correct") + assert.Fail("error_total metrics were not correct") } func setupRolledbackTx(t *testing.T, db *dbtest.DB) *Session { From 6bac50139f70b3bc93661e5bbd66e8974ed3bfef Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Mon, 4 Mar 2024 21:48:50 -0800 Subject: [PATCH 09/10] #5217: added test for db_error 'other' case --- support/db/metrics.go | 4 ++-- support/db/session_test.go | 21 ++++++++++++++++++++- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/support/db/metrics.go b/support/db/metrics.go index e36988cb35..5abfe3013a 100644 --- a/support/db/metrics.go +++ b/support/db/metrics.go @@ -369,7 +369,7 @@ func getQueryType(ctx context.Context, query squirrel.Sqlizer) QueryType { return UndefinedQueryType } -// derive the db 'abend_total' metric from the err returned by libpq sdk +// derive the db 'error_total' metric from the err returned by libpq // // dbErr - the error returned by any libpq method call // ctx - the caller's context used on libpb method call @@ -379,7 +379,7 @@ func (s *SessionWithMetrics) handleErrorEvent(dbErr error, ctx context.Context) } ctxError := "n/a" - dbError := "n/a" + dbError := "other" errorExtra := "n/a" var pqErr *pq.Error diff --git a/support/db/session_test.go b/support/db/session_test.go index cb7963cbf5..1fd2a3902b 100644 --- a/support/db/session_test.go +++ b/support/db/session_test.go @@ -288,7 +288,7 @@ func TestIdleTransactionTimeoutAndContextTimeout(t *testing.T) { assertDbErrorMetrics(reg, "deadline_exceeded", "deadline_exceeded", "n/a", assert) } -func TestDbServerErrorInMetrics(t *testing.T) { +func TestDbServerErrorCodeInMetrics(t *testing.T) { assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) defer db.Close() @@ -305,6 +305,25 @@ func TestDbServerErrorInMetrics(t *testing.T) { assertDbErrorMetrics(reg, "n/a", "42601", "n/a", assert) } +func TestDbOtherErrorInMetrics(t *testing.T) { + assert := assert.New(t) + db := dbtest.Postgres(t).Load(testSchema) + defer db.Close() + + conn := db.Open() + conn.Close() + sessRaw := &Session{DB: conn} + reg := prometheus.NewRegistry() + sess := RegisterMetrics(sessRaw, "test", "subtest", reg) + + defer sess.Close() + + var count int + err := sess.GetRaw(context.Background(), &count, "SELECT COUNT(*) FROM people") + assert.ErrorContains(err, "sql: database is closed") + assertDbErrorMetrics(reg, "n/a", "other", "n/a", assert) +} + func TestSessionAfterRollback(t *testing.T) { assert := assert.New(t) db := dbtest.Postgres(t).Load(testSchema) From 4c531c629af87c8495d42c6c0a2c2368b0d289cb Mon Sep 17 00:00:00 2001 From: Shawn Reuland Date: Tue, 5 Mar 2024 11:45:55 -0800 Subject: [PATCH 10/10] #5217: updated changelog --- services/horizon/CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/horizon/CHANGELOG.md b/services/horizon/CHANGELOG.md index cd411b7120..6bb186b393 100644 --- a/services/horizon/CHANGELOG.md +++ b/services/horizon/CHANGELOG.md @@ -3,6 +3,11 @@ All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/). +## unreleased + +### Added +- New `db_error_total` metrics key with labels `ctx_error`, `db_error`, and `db_error_extra` ([5225](https://github.com/stellar/go/pull/5225)). + ## 2.28.3 ### Fixed