Skip to content

Commit

Permalink
Add timeout on horizon db ingestion statements
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms committed May 8, 2020
1 parent ebd7214 commit 3ccaf33
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
4 changes: 4 additions & 0 deletions services/horizon/internal/expingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ const (
// MaxDBConnections is the size of the postgres connection pool dedicated to Horizon ingestion
MaxDBConnections = 2

dbSessionTimeout = 20 * time.Second

defaultCoreCursorName = "HORIZON"
stateVerificationErrorThreshold = 3
)
Expand Down Expand Up @@ -146,6 +148,7 @@ func NewSystem(config Config) (*System, error) {

coreSession := config.CoreSession.Clone()
coreSession.Ctx = ctx
coreSession.Timeout = dbSessionTimeout

ledgerBackend, err := ledgerbackend.NewDatabaseBackendFromSession(coreSession)
if err != nil {
Expand All @@ -155,6 +158,7 @@ func NewSystem(config Config) (*System, error) {

historyQ := &history.Q{config.HistorySession.Clone()}
historyQ.Ctx = ctx
historyQ.Timeout = dbSessionTimeout

historyAdapter := adapters.MakeHistoryArchiveAdapter(archive)

Expand Down
6 changes: 6 additions & 0 deletions support/db/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package db
import (
"context"
"database/sql"
"time"

"github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -101,6 +102,11 @@ type Session struct {
// Ctx is the context in which the repo is operating under.
Ctx context.Context

// Timeout is a timeout applied on all SQL Select queries and Exec statements.
// If Timeout is 0 then no timeout is applied. Note that the timeout is not
// applied on BEGIN, COMMIT, or ROLLBACK statements.
Timeout time.Duration

tx *sqlx.Tx
}

Expand Down
32 changes: 26 additions & 6 deletions support/db/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ func (s *Session) GetTx() *sqlx.Tx {
// source is currently within.
func (s *Session) Clone() *Session {
return &Session{
DB: s.DB,
Ctx: s.Ctx,
DB: s.DB,
Ctx: s.Ctx,
Timeout: s.Timeout,
}
}

Expand Down Expand Up @@ -120,6 +121,17 @@ func (s *Session) Get(dest interface{}, query sq.Sqlizer) error {
return s.GetRaw(dest, sql, args...)
}

func noop() {

}

func (s *Session) contextWithTimeout() (context.Context, context.CancelFunc) {
if s.Timeout == 0 {
return s.Ctx, noop
}
return context.WithTimeout(s.Ctx, s.Timeout)
}

// GetRaw runs `query` with `args`, setting the first result found on
// `dest`, if any.
func (s *Session) GetRaw(dest interface{}, query string, args ...interface{}) error {
Expand All @@ -129,7 +141,9 @@ func (s *Session) GetRaw(dest interface{}, query string, args ...interface{}) er
}

start := time.Now()
err = s.conn().GetContext(s.Ctx, dest, query, args...)
ctx, cancel := s.contextWithTimeout()
defer cancel()
err = s.conn().GetContext(ctx, dest, query, args...)
s.log("get", start, query, args)

if err == nil {
Expand Down Expand Up @@ -198,7 +212,9 @@ func (s *Session) ExecRaw(query string, args ...interface{}) (sql.Result, error)
}

start := time.Now()
result, err := s.conn().ExecContext(s.Ctx, query, args...)
ctx, cancel := s.contextWithTimeout()
defer cancel()
result, err := s.conn().ExecContext(ctx, query, args...)
s.log("exec", start, query, args)

if err == nil {
Expand Down Expand Up @@ -244,7 +260,9 @@ func (s *Session) QueryRaw(query string, args ...interface{}) (*sqlx.Rows, error
}

start := time.Now()
result, err := s.conn().QueryxContext(s.Ctx, query, args...)
ctx, cancel := s.contextWithTimeout()
defer cancel()
result, err := s.conn().QueryxContext(ctx, query, args...)
s.log("query", start, query, args)

if err == nil {
Expand Down Expand Up @@ -308,7 +326,9 @@ func (s *Session) SelectRaw(
}

start := time.Now()
err = s.conn().SelectContext(s.Ctx, dest, query, args...)
ctx, cancel := s.contextWithTimeout()
defer cancel()
err = s.conn().SelectContext(ctx, dest, query, args...)
s.log("select", start, query, args)

if err == nil {
Expand Down
13 changes: 13 additions & 0 deletions support/db/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,25 @@ package db
import (
"context"
"testing"
"time"

"github.com/stellar/go/support/db/dbtest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestClone(t *testing.T) {
session := &Session{Ctx: context.Background()}
sessionClone := session.Clone()
assert.Equal(t, time.Duration(0), sessionClone.Timeout)
assert.Equal(t, session.Ctx, sessionClone.Ctx)

session.Timeout = time.Second
sessionClone = session.Clone()
assert.Equal(t, session.Timeout, sessionClone.Timeout)
assert.Equal(t, session.Ctx, sessionClone.Ctx)
}

func TestSession(t *testing.T) {
db := dbtest.Postgres(t).Load(testSchema)
defer db.Close()
Expand Down

0 comments on commit 3ccaf33

Please sign in to comment.