Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Add new metrics counters for db connection close events #5225

Merged
merged 13 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
All notable changes to this project will be documented in this
file. This project adheres to [Semantic Versioning](http://semver.org/).

## unreleased

### Added
- New `db_error_total` metrics key with labels `ctx_error`, `db_error`, and `db_error_extra` ([5225](https://github.com/stellar/go/pull/5225)).

## 2.28.3

### Fixed
Expand Down
8 changes: 6 additions & 2 deletions support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ type Session struct {
// DB is the database connection that queries should be executed against.
DB *sqlx.DB

tx *sqlx.Tx
txOptions *sql.TxOptions
tx *sqlx.Tx
txOptions *sql.TxOptions
errorHandlers []ErrorHandlerFunc
}

// dbErr - the Postgres error
// ctx - the caller's context
type ErrorHandlerFunc func(dbErr error, ctx context.Context)
type SessionInterface interface {
BeginTx(ctx context.Context, opts *sql.TxOptions) error
Begin(ctx context.Context) error
Expand Down
73 changes: 66 additions & 7 deletions support/db/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package db
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"

"github.com/Masterminds/squirrel"
"github.com/lib/pq"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -58,6 +60,7 @@ type SessionWithMetrics struct {
maxLifetimeClosedCounter prometheus.CounterFunc
roundTripProbe *roundTripProbe
roundTripTimeSummary prometheus.Summary
errorCounter *prometheus.CounterVec
}

func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *prometheus.Registry) SessionInterface {
Expand All @@ -66,6 +69,8 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *
registry: registry,
}

base.AddErrorHandler(s.handleErrorEvent)

s.queryCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -226,6 +231,18 @@ func RegisterMetrics(base *Session, namespace string, sub Subservice, registry *
)
registry.MustRegister(s.maxLifetimeClosedCounter)

s.errorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: "db",
Name: "error_total",
Help: "total number of db related errors, details are captured in labels",
ConstLabels: prometheus.Labels{"subservice": string(sub)},
},
[]string{"ctx_error", "db_error", "db_error_extra"},
)
registry.MustRegister(s.errorCounter)

s.roundTripTimeSummary = prometheus.NewSummary(
prometheus.SummaryOpts{
Namespace: namespace,
Expand Down Expand Up @@ -262,15 +279,10 @@ func (s *SessionWithMetrics) Close() error {
s.registry.Unregister(s.maxIdleClosedCounter)
s.registry.Unregister(s.maxIdleTimeClosedCounter)
s.registry.Unregister(s.maxLifetimeClosedCounter)
s.registry.Unregister(s.errorCounter)
return s.SessionInterface.Close()
}

// TODO: Implement these
// func (s *SessionWithMetrics) BeginTx(ctx context.Context, opts *sql.TxOptions) error {
// func (s *SessionWithMetrics) Begin(ctx context.Context) error {
// func (s *SessionWithMetrics) Commit(ctx context.Context) error
// func (s *SessionWithMetrics) Rollback(ctx context.Context) error

func (s *SessionWithMetrics) TruncateTables(ctx context.Context, tables []string) (err error) {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
s.queryDurationSummary.With(prometheus.Labels{
Expand Down Expand Up @@ -314,6 +326,7 @@ func (s *SessionWithMetrics) Clone() SessionInterface {
maxIdleClosedCounter: s.maxIdleClosedCounter,
maxIdleTimeClosedCounter: s.maxIdleTimeClosedCounter,
maxLifetimeClosedCounter: s.maxLifetimeClosedCounter,
errorCounter: s.errorCounter,
}
}

Expand Down Expand Up @@ -356,6 +369,53 @@ func getQueryType(ctx context.Context, query squirrel.Sqlizer) QueryType {
return UndefinedQueryType
}

// derive the db 'error_total' metric from the err returned by libpq
//
// dbErr - the error returned by any libpq method call
// ctx - the caller's context used on libpb method call
func (s *SessionWithMetrics) handleErrorEvent(dbErr error, ctx context.Context) {
if dbErr == nil || s.NoRows(dbErr) {
return
}

ctxError := "n/a"
dbError := "other"
errorExtra := "n/a"
var pqErr *pq.Error

switch {
case errors.As(dbErr, &pqErr):
dbError = string(pqErr.Code)
switch pqErr.Message {
case "canceling statement due to user request":
errorExtra = "user_request"
case "canceling statement due to statement timeout":
errorExtra = "statement_timeout"
}
case strings.Contains(dbErr.Error(), "driver: bad connection"):
dbError = "driver_bad_connection"
case strings.Contains(dbErr.Error(), "sql: transaction has already been committed or rolled back"):
dbError = "tx_already_rollback"
case errors.Is(dbErr, context.Canceled):
dbError = "canceled"
case errors.Is(dbErr, context.DeadlineExceeded):
dbError = "deadline_exceeded"
}

switch {
case errors.Is(ctx.Err(), context.Canceled):
ctxError = "canceled"
case errors.Is(ctx.Err(), context.DeadlineExceeded):
ctxError = "deadline_exceeded"
sreuland marked this conversation as resolved.
Show resolved Hide resolved
}

s.errorCounter.With(prometheus.Labels{
"ctx_error": ctxError,
"db_error": dbError,
"db_error_extra": errorExtra,
}).Inc()
}

func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query squirrel.Sqlizer) (err error) {
queryType := string(getQueryType(ctx, query))
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
Expand All @@ -373,7 +433,6 @@ func (s *SessionWithMetrics) Get(ctx context.Context, dest interface{}, query sq
"route": contextRoute(ctx),
}).Inc()
}()

err = s.SessionInterface.Get(ctx, dest, query)
return err
}
Expand Down
81 changes: 57 additions & 24 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package db
import (
"context"
"database/sql"
go_errors "errors"
sreuland marked this conversation as resolved.
Show resolved Hide resolved
"fmt"
"reflect"
"strings"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/stellar/go/support/db/sqlutils"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
Expand All @@ -23,7 +25,7 @@ func (s *Session) Begin(ctx context.Context) error {

tx, err := s.DB.BeginTxx(ctx, nil)
if err != nil {
if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand All @@ -44,7 +46,7 @@ func (s *Session) BeginTx(ctx context.Context, opts *sql.TxOptions) error {

tx, err := s.DB.BeginTxx(ctx, opts)
if err != nil {
if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down Expand Up @@ -92,7 +94,7 @@ func (s *Session) Commit() error {
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
Expand Down Expand Up @@ -146,7 +148,7 @@ func (s *Session) GetRaw(ctx context.Context, dest interface{}, query string, ar
return nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down Expand Up @@ -215,7 +217,7 @@ func (s *Session) ExecRaw(ctx context.Context, query string, args ...interface{}
return result, nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return nil, knownErr
}

Expand All @@ -232,29 +234,60 @@ func (s *Session) NoRows(err error) bool {
return err == sql.ErrNoRows
}

// replaceWithKnownError tries to replace Postgres error with package error.
// Returns a new error if the err is known.
func (s *Session) replaceWithKnownError(err error, ctx context.Context) error {
if err == nil {
func (s *Session) AddErrorHandler(handler ErrorHandlerFunc) {
s.errorHandlers = append(s.errorHandlers, handler)
}

// handleError does housekeeping on errors from db.
// dbErr - the libpq client error
// ctx - the calling context
//
// tries to replace dbErr with horizon package error, returns a new error if the err is known.
// invokes any additional error handlers that may have been
// added to the session, passing the caller's context
func (s *Session) handleError(dbErr error, ctx context.Context) error {
if dbErr == nil {
return nil
}

for _, handler := range s.errorHandlers {
handler(dbErr, ctx)
}

var dbErrorCode pq.ErrorCode
var pqErr *pq.Error

// if libpql sends to server, and then any server side error is reported,
// libpq passes back only an pq.ErrorCode from method call
// even if the caller context generates a cancel/deadline error during the server trip,
// libpq will only return an instance of pq.ErrorCode as a non-wrapped error
if go_errors.As(dbErr, &pqErr) {
dbErrorCode = pqErr.Code
}

switch {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
case ctx.Err() == context.Canceled:
return ErrCancelled
case ctx.Err() == context.DeadlineExceeded:
// if libpq waits too long to obtain conn from pool, can get ctx timeout before server trip
return ErrTimeout
case strings.Contains(err.Error(), "pq: canceling statement due to user request"):
return ErrTimeout
case strings.Contains(err.Error(), "pq: canceling statement due to conflict with recovery"):
case strings.Contains(dbErr.Error(), "pq: canceling statement due to conflict with recovery"):
return ErrConflictWithRecovery
case strings.Contains(err.Error(), "driver: bad connection"):
case strings.Contains(dbErr.Error(), "driver: bad connection"):
return ErrBadConnection
case strings.Contains(err.Error(), "pq: canceling statement due to statement timeout"):
sreuland marked this conversation as resolved.
Show resolved Hide resolved
return ErrStatementTimeout
case strings.Contains(err.Error(), "transaction has already been committed or rolled back"):
case strings.Contains(dbErr.Error(), "transaction has already been committed or rolled back"):
return ErrAlreadyRolledback
case go_errors.Is(ctx.Err(), context.Canceled):
// when horizon's context is cancelled by it's upstream api client,
// it will propagate to here and libpq will emit a wrapped err that has the cancel err
return ErrCancelled
case go_errors.Is(ctx.Err(), context.DeadlineExceeded):
// when horizon's context times out(it's set to app connection-timeout),
// it will trigger libpq to emit a wrapped err that has the deadline err
return ErrTimeout
case dbErrorCode == "57014":
// https://www.postgresql.org/docs/12/errcodes-appendix.html, query_canceled
// this code can be generated for multiple cases,
// by libpq sending a signal to server when it experiences a context cancel/deadline
// or it could happen based on just server statement_timeout setting
// since we check the context cancel/deadline err state first, getting here means
// this can only be from a statement timeout
return ErrStatementTimeout
default:
return nil
}
Expand Down Expand Up @@ -284,7 +317,7 @@ func (s *Session) QueryRaw(ctx context.Context, query string, args ...interface{
return result, nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return nil, knownErr
}

Expand Down Expand Up @@ -318,7 +351,7 @@ func (s *Session) Rollback() error {
s.tx = nil
s.txOptions = nil

if knownErr := s.replaceWithKnownError(err, context.Background()); knownErr != nil {
if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
}
return err
Expand Down Expand Up @@ -362,7 +395,7 @@ func (s *Session) SelectRaw(
return nil
}

if knownErr := s.replaceWithKnownError(err, ctx); knownErr != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
return knownErr
}

Expand Down
Loading
Loading