Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support/db: Delay canceling queries from client side when there's a statement / transaction timeout configured in postgres #5223

Merged
merged 13 commits into from
Mar 14, 2024
1 change: 1 addition & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ func (a *App) init() error {
SSEUpdateFrequency: a.config.SSEUpdateFrequency,
StaleThreshold: a.config.StaleThreshold,
ConnectionTimeout: a.config.ConnectionTimeout,
CancelDBQueryTimeout: a.config.CancelDBQueryTimeout,
MaxHTTPRequestSize: a.config.MaxHTTPRequestSize,
NetworkPassphrase: a.config.NetworkPassphrase,
MaxPathLength: a.config.MaxPathLength,
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ type Config struct {
HorizonDBMaxOpenConnections int
HorizonDBMaxIdleConnections int

SSEUpdateFrequency time.Duration
ConnectionTimeout time.Duration
SSEUpdateFrequency time.Duration
ConnectionTimeout time.Duration
CancelDBQueryTimeout time.Duration
// MaxHTTPRequestSize is the maximum allowed request payload size
MaxHTTPRequestSize uint
RateQuota *throttled.RateQuota
Expand Down
17 changes: 17 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,18 @@ func Flags() (*Config, support.ConfigOptions) {
Usage: "defines the timeout of connection after which 504 response will be sent or stream will be closed, if Horizon is behind a load balancer with idle connection timeout, this should be set to a few seconds less that idle timeout, does not apply to POST /transactions",
UsedInCommands: ApiServerCommands,
},
&support.ConfigOption{
Name: "cancel-db-query-timeout",
Copy link
Contributor

@Shaptic Shaptic Mar 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just driving by - can we simplify the name to query-timeout? I think it delivers the same meaning with less verbosity

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Shaptic how about cancel-query-timeout? I would like to emphasize that this timeout is only for client side query canceling and not like the postgres server side statement timeout

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, good point! maybe client-query-timeout then? I guess in my mind I read the flag as "cancel query" and think "what the heck is a cancel query, and why does it need a timeout?"

(also to be clear I don't want to delay this PR at all)

ConfigKey: &config.CancelDBQueryTimeout,
OptType: types.Int,
FlagDefault: 0,
CustomSetValue: support.SetDuration,
Usage: "defines the timeout for when horizon will cancel all postgres queries connected to an HTTP request. The timeout is measured in seconds since the start of the HTTP request. Note, this timeout does not apply to POST /transactions. " +
"The difference between cancel-db-query-timeout and connection-timeout is that connection-timeout applies a postgres statement timeout whereas cancel-db-query-timeout will send an additional request to postgres to cancel the ongoing query. " +
"Generally, cancel-db-query-timeout should be configured to be higher than connection-timeout to allow the postgres statement timeout to kill long running queries without having to send the additional cancel request to postgres. " +
"By default, cancel-db-query-timeout will be set to twice the connection-timeout.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"By default, cancel-db-query-timeout will be set to twice the connection-timeout.",
"When set to 0, which is the default, it will trigger cancel-db-query-timeout to be set to twice the connection-timeout.",

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor, should the flag allow for disabling by the user, I see later in middleware, 0 equates to skipping application of the flag, but it would never be exercised as of now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to allow the flag to be disabled because I think it could result in confusion. If we skip the application of this flag, the connection timeout will be set to both the postgres server side statement timeouts and the query cancelations on the horizon side. I think this would be unintuitive because, as a user, my first thought is that disabling the query cancelation timeout would result in no query cancelation whatsoever.

If we wanted to allow the option of having no query cancelation at all we could equate 0 to that behavior. -1 could be a default which means cancel-db-query-timeout will be set to twice the connection-timeout value. The user still has the option of setting cancel-db-query-timeout and connection-timeout to the same value which will result in more or less the status quo behavior of horizon prior to this PR.

UsedInCommands: ApiServerCommands,
},
&support.ConfigOption{
Name: "max-http-request-size",
ConfigKey: &config.MaxHTTPRequestSize,
Expand Down Expand Up @@ -983,5 +995,10 @@ func ApplyFlags(config *Config, flags support.ConfigOptions, options ApplyOption
" If Horizon is behind both, use --behind-cloudflare only")
}

if config.CancelDBQueryTimeout == 0 {
// the default value for cancel-db-query-timeout is twice the connection-timeout
config.CancelDBQueryTimeout = config.ConnectionTimeout * 2
}

return nil
}
16 changes: 13 additions & 3 deletions services/horizon/internal/httpx/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,14 +197,15 @@ func recoverMiddleware(h http.Handler) http.Handler {
// NewHistoryMiddleware adds session to the request context and ensures Horizon
// is not in a stale state, which is when the difference between latest core
// ledger and latest history ledger is higher than the given threshold
func NewHistoryMiddleware(ledgerState *ledger.State, staleThreshold int32, session db.SessionInterface) func(http.Handler) http.Handler {
func NewHistoryMiddleware(ledgerState *ledger.State, staleThreshold int32, session db.SessionInterface, contextDBTimeout time.Duration) func(http.Handler) http.Handler {
return func(h http.Handler) http.Handler {

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if routePattern := supportHttp.GetChiRoutePattern(r); routePattern != "" {
ctx = context.WithValue(ctx, &db.RouteContextKey, routePattern)
}
ctx = setContextDBTimeout(contextDBTimeout, ctx)
if staleThreshold > 0 {
ls := ledgerState.CurrentStatus()
isStale := (ls.CoreLatest - ls.HistoryLatest) > int32(staleThreshold)
Expand Down Expand Up @@ -237,8 +238,9 @@ func NewHistoryMiddleware(ledgerState *ledger.State, staleThreshold int32, sessi
// has been verified and is correct (Otherwise returns `500 Internal Server Error` to prevent
// returning invalid data to the user)
type StateMiddleware struct {
HorizonSession db.SessionInterface
NoStateVerification bool
HorizonSession db.SessionInterface
CancelDBQueryTimeout time.Duration
NoStateVerification bool
}

func ingestionStatus(ctx context.Context, q *history.Q) (uint32, bool, error) {
Expand Down Expand Up @@ -276,6 +278,7 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
if routePattern := supportHttp.GetChiRoutePattern(r); routePattern != "" {
ctx = context.WithValue(ctx, &db.RouteContextKey, routePattern)
}
ctx = setContextDBTimeout(m.CancelDBQueryTimeout, ctx)
session := m.HorizonSession.Clone()
q := &history.Q{session}
sseRequest := render.Negotiate(r) == render.MimeEventStream
Expand Down Expand Up @@ -344,6 +347,13 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
}
}

func setContextDBTimeout(timeout time.Duration, ctx context.Context) context.Context {
if timeout > 0 {
ctx = context.WithValue(ctx, &db.DeadlineCtxKey, time.Now().Add(timeout))
}
return ctx
}

// WrapFunc executes the middleware on a given HTTP handler function
func (m *StateMiddleware) Wrap(h http.Handler) http.Handler {
return m.WrapFunc(h.ServeHTTP)
Expand Down
6 changes: 4 additions & 2 deletions services/horizon/internal/httpx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type RouterConfig struct {
SSEUpdateFrequency time.Duration
StaleThreshold uint
ConnectionTimeout time.Duration
CancelDBQueryTimeout time.Duration
MaxHTTPRequestSize uint
NetworkPassphrase string
MaxPathLength uint
Expand Down Expand Up @@ -139,7 +140,8 @@ func (r *Router) addMiddleware(config *RouterConfig,

func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRateLimiter, ledgerState *ledger.State) {
stateMiddleware := StateMiddleware{
HorizonSession: config.DBSession,
HorizonSession: config.DBSession,
CancelDBQueryTimeout: config.CancelDBQueryTimeout,
}

r.Method(http.MethodGet, "/health", config.HealthCheck)
Expand All @@ -157,7 +159,7 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
LedgerSourceFactory: historyLedgerSourceFactory{ledgerState: ledgerState, updateFrequency: config.SSEUpdateFrequency},
}

historyMiddleware := NewHistoryMiddleware(ledgerState, int32(config.StaleThreshold), config.DBSession)
historyMiddleware := NewHistoryMiddleware(ledgerState, int32(config.StaleThreshold), config.DBSession, config.CancelDBQueryTimeout)
// State endpoints behind stateMiddleware
r.Group(func(r chi.Router) {
r.Route("/accounts", func(r chi.Router) {
Expand Down
22 changes: 7 additions & 15 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,38 +45,29 @@ func mustInitHorizonDB(app *App) {
log.Fatalf("max open connections to horizon db must be greater than %d", ingest.MaxDBConnections)
}
}
serverSidePGTimeoutConfigs := []db.ClientConfig{
db.StatementTimeout(app.config.ConnectionTimeout),
db.IdleTransactionTimeout(app.config.ConnectionTimeout),
}
Comment on lines +48 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the ingestion system uses a separate connection pool

HistorySession: mustNewDBSession(
db.IngestSubservice, app.config.DatabaseURL, ingest.MaxDBConnections, ingest.MaxDBConnections, app.prometheusRegistry,
),

so we can apply these timeout settings to app.historyQ and app.primaryHistoryQ because we know that those db connections will never be used in ingestion and will only be used when serving HTTP requests


if app.config.RoDatabaseURL == "" {
var clientConfigs []db.ClientConfig
if !app.config.Ingest {
// if we are not ingesting then we don't expect to have long db queries / transactions
clientConfigs = append(
clientConfigs,
db.StatementTimeout(app.config.ConnectionTimeout),
db.IdleTransactionTimeout(app.config.ConnectionTimeout),
)
}
app.historyQ = &history.Q{mustNewDBSession(
db.HistorySubservice,
app.config.DatabaseURL,
maxIdle,
maxOpen,
app.prometheusRegistry,
clientConfigs...,
serverSidePGTimeoutConfigs...,
)}
} else {
// If RO set, use it for all DB queries
roClientConfigs := []db.ClientConfig{
db.StatementTimeout(app.config.ConnectionTimeout),
db.IdleTransactionTimeout(app.config.ConnectionTimeout),
}
app.historyQ = &history.Q{mustNewDBSession(
db.HistorySubservice,
app.config.RoDatabaseURL,
maxIdle,
maxOpen,
app.prometheusRegistry,
roClientConfigs...,
serverSidePGTimeoutConfigs...,
)}

app.primaryHistoryQ = &history.Q{mustNewDBSession(
Expand All @@ -85,6 +76,7 @@ func mustInitHorizonDB(app *App) {
maxIdle,
maxOpen,
app.prometheusRegistry,
serverSidePGTimeoutConfigs...,
)}
}
}
Expand Down
2 changes: 1 addition & 1 deletion services/horizon/internal/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestCheckHistoryStaleMiddleware(t *testing.T) {
}
ledgerState := &ledger.State{}
ledgerState.SetStatus(state)
historyMiddleware := httpx.NewHistoryMiddleware(ledgerState, testCase.staleThreshold, tt.HorizonSession())
historyMiddleware := httpx.NewHistoryMiddleware(ledgerState, testCase.staleThreshold, tt.HorizonSession(), 0)
handler := chi.NewRouter()
handler.With(historyMiddleware).MethodFunc("GET", "/", endpoint)
w := httptest.NewRecorder()
Expand Down
2 changes: 1 addition & 1 deletion staticcheck.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#! /bin/bash
set -e

version='2023.1.1'
version='2023.1.7'

staticcheck='go run honnef.co/go/tools/cmd/staticcheck@'"$version"

Expand Down
2 changes: 2 additions & 0 deletions support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"

"github.com/stellar/go/support/errors"

// Enable postgres
Expand Down Expand Up @@ -119,6 +120,7 @@ type Session struct {
DB *sqlx.DB

tx *sqlx.Tx
txCancel context.CancelFunc
txOptions *sql.TxOptions
errorHandlers []ErrorHandlerFunc
}
Expand Down
76 changes: 72 additions & 4 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,60 @@ import (
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"

"github.com/stellar/go/support/db/sqlutils"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
)

var DeadlineCtxKey = CtxKey("deadline")

func noop() {}

// context() checks if there is a override on the context timeout which is configured using DeadlineCtxKey.
// If the override exists, we return a new context with the desired deadline. Otherwise, we return the
// original context.
// Note that the override will not be applied if requestCtx has already been terminated.
func (s *Session) context(requestCtx context.Context) (context.Context, context.CancelFunc, error) {
sreuland marked this conversation as resolved.
Show resolved Hide resolved
deadline, ok := requestCtx.Value(&DeadlineCtxKey).(time.Time)
if !ok {
tamirms marked this conversation as resolved.
Show resolved Hide resolved
return requestCtx, noop, nil
}

// if requestCtx is already terminated don't proceed with the db statement
if requestCtx.Err() != nil {
return requestCtx, noop, requestCtx.Err()
}

ctx, cancel := context.WithDeadline(context.Background(), deadline)
return ctx, cancel, nil
}

// Begin binds this session to a new transaction.
func (s *Session) Begin(ctx context.Context) error {
if s.tx != nil {
return errors.New("already in transaction")
}
ctx, cancel, err := s.context(ctx)
if err != nil {
cancel()
sreuland marked this conversation as resolved.
Show resolved Hide resolved
return err
}

tx, err := s.DB.BeginTxx(ctx, nil)
if err != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
cancel()
sreuland marked this conversation as resolved.
Show resolved Hide resolved
return knownErr
}

cancel()
return errors.Wrap(err, "beginx failed")
}
log.Debug("sql: begin")
s.tx = tx
s.txOptions = nil
s.txCancel = cancel
return nil
}

Expand All @@ -43,19 +75,27 @@ func (s *Session) BeginTx(ctx context.Context, opts *sql.TxOptions) error {
if s.tx != nil {
return errors.New("already in transaction")
}
ctx, cancel, err := s.context(ctx)
if err != nil {
cancel()
return err
}

tx, err := s.DB.BeginTxx(ctx, opts)
if err != nil {
if knownErr := s.handleError(err, ctx); knownErr != nil {
cancel()
return knownErr
}

cancel()
return errors.Wrap(err, "beginTx failed")
}
log.Debug("sql: begin")

s.tx = tx
s.txOptions = opts
s.txCancel = cancel
return nil
}

Expand Down Expand Up @@ -93,6 +133,8 @@ func (s *Session) Commit() error {
log.Debug("sql: commit")
s.tx = nil
s.txOptions = nil
s.txCancel()
s.txCancel = nil

if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
Expand Down Expand Up @@ -135,7 +177,13 @@ func (s *Session) Get(ctx context.Context, dest interface{}, query sq.Sqlizer) e
// GetRaw runs `query` with `args`, setting the first result found on
// `dest`, if any.
func (s *Session) GetRaw(ctx context.Context, dest interface{}, query string, args ...interface{}) error {
query, err := s.ReplacePlaceholders(query)
ctx, cancel, err := s.context(ctx)
sreuland marked this conversation as resolved.
Show resolved Hide resolved
defer cancel()
if err != nil {
return err
}

query, err = s.ReplacePlaceholders(query)
if err != nil {
return errors.Wrap(err, "replace placeholders failed")
}
Expand Down Expand Up @@ -204,7 +252,13 @@ func (s *Session) ExecAll(ctx context.Context, script string) error {

// ExecRaw runs `query` with `args`
func (s *Session) ExecRaw(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
query, err := s.ReplacePlaceholders(query)
ctx, cancel, err := s.context(ctx)
defer cancel()
if err != nil {
return nil, err
}

query, err = s.ReplacePlaceholders(query)
if err != nil {
return nil, errors.Wrap(err, "replace placeholders failed")
}
Expand Down Expand Up @@ -304,7 +358,13 @@ func (s *Session) Query(ctx context.Context, query sq.Sqlizer) (*sqlx.Rows, erro

// QueryRaw runs `query` with `args`
func (s *Session) QueryRaw(ctx context.Context, query string, args ...interface{}) (*sqlx.Rows, error) {
query, err := s.ReplacePlaceholders(query)
ctx, cancel, err := s.context(ctx)
defer cancel()
if err != nil {
return nil, err
}

query, err = s.ReplacePlaceholders(query)
if err != nil {
return nil, errors.Wrap(err, "replace placeholders failed")
}
Expand Down Expand Up @@ -350,6 +410,8 @@ func (s *Session) Rollback() error {
log.Debug("sql: rollback")
s.tx = nil
s.txOptions = nil
s.txCancel()
s.txCancel = nil

if knownErr := s.handleError(err, context.Background()); knownErr != nil {
return knownErr
Expand Down Expand Up @@ -381,8 +443,14 @@ func (s *Session) SelectRaw(
query string,
args ...interface{},
) error {
ctx, cancel, err := s.context(ctx)
defer cancel()
if err != nil {
return err
}

s.clearSliceIfPossible(dest)
query, err := s.ReplacePlaceholders(query)
query, err = s.ReplacePlaceholders(query)
if err != nil {
return errors.Wrap(err, "replace placeholders failed")
}
Expand Down
Loading
Loading