diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index 10bca22743..475cb552c0 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -56,6 +56,7 @@ type App struct { config Config webServer *httpx.Server historyQ *history.Q + primaryHistoryQ *history.Q ctx context.Context cancel func() horizonVersion string @@ -502,6 +503,10 @@ func (a *App) init() error { }, } + if a.primaryHistoryQ != nil { + routerConfig.PrimaryDBSession = a.primaryHistoryQ.Session + } + var err error config := httpx.ServerConfig{ Port: uint16(a.config.Port), diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 2bca813e91..60054b4d9d 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -12,6 +12,7 @@ import ( // app's main function and is provided to NewApp. type Config struct { DatabaseURL string + RoDatabaseURL string HistoryArchiveURLs []string Port uint AdminPort uint diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 491be01803..c96ad262ef 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -93,6 +93,13 @@ func Flags() (*Config, support.ConfigOptions) { Required: true, Usage: "horizon postgres database to connect with", }, + &support.ConfigOption{ + Name: "ro-database-url", + ConfigKey: &config.RoDatabaseURL, + OptType: types.String, + Required: false, + Usage: "horizon postgres read-replica to connect with, when set it will return stale history error when replica is behind primary", + }, &support.ConfigOption{ Name: StellarCoreBinaryPathName, OptType: types.String, diff --git a/services/horizon/internal/httpx/middleware.go b/services/horizon/internal/httpx/middleware.go index b4d77faf81..544900bfb8 100644 --- a/services/horizon/internal/httpx/middleware.go +++ b/services/horizon/internal/httpx/middleware.go @@ -354,3 +354,51 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc { func (m *StateMiddleware) Wrap(h http.Handler) http.Handler { return m.WrapFunc(h.ServeHTTP) } + +type ReplicaSyncCheckMiddleware struct { + PrimaryHistoryQ *history.Q + ReplicaHistoryQ *history.Q + ServerMetrics *ServerMetrics +} + +// WrapFunc executes the middleware on a given HTTP handler function +func (m *ReplicaSyncCheckMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + for attempt := 1; attempt <= 4; attempt++ { + primaryIngestLedger, err := m.PrimaryHistoryQ.GetLastLedgerIngestNonBlocking() + if err != nil { + problem.Render(r.Context(), w, err) + return + } + + replicaIngestLedger, err := m.ReplicaHistoryQ.GetLastLedgerIngestNonBlocking() + if err != nil { + problem.Render(r.Context(), w, err) + return + } + + if replicaIngestLedger >= primaryIngestLedger { + break + } + + switch attempt { + case 1: + time.Sleep(20 * time.Millisecond) + case 2: + time.Sleep(40 * time.Millisecond) + case 3: + time.Sleep(80 * time.Millisecond) + case 4: + problem.Render(r.Context(), w, hProblem.StaleHistory) + m.ServerMetrics.ReplicaLagErrorsCounter.Inc() + return + } + } + + h.ServeHTTP(w, r) + } +} + +func (m *ReplicaSyncCheckMiddleware) Wrap(h http.Handler) http.Handler { + return m.WrapFunc(h.ServeHTTP) +} diff --git a/services/horizon/internal/httpx/router.go b/services/horizon/internal/httpx/router.go index 966d70178c..09049f6727 100644 --- a/services/horizon/internal/httpx/router.go +++ b/services/horizon/internal/httpx/router.go @@ -16,6 +16,7 @@ import ( "github.com/stellar/throttled" "github.com/stellar/go/services/horizon/internal/actions" + "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ledger" "github.com/stellar/go/services/horizon/internal/paths" "github.com/stellar/go/services/horizon/internal/render/sse" @@ -28,9 +29,10 @@ import ( const maxAssetsForPathFinding = 15 type RouterConfig struct { - DBSession *db.Session - TxSubmitter *txsub.System - RateQuota *throttled.RateQuota + DBSession *db.Session + PrimaryDBSession *db.Session + TxSubmitter *txsub.System + RateQuota *throttled.RateQuota BehindCloudflare bool BehindAWSLoadBalancer bool @@ -99,6 +101,15 @@ func (r *Router) addMiddleware(config *RouterConfig, r.Use(rateLimitter.RateLimit) } + if config.PrimaryDBSession != nil { + replicaSyncMiddleware := ReplicaSyncCheckMiddleware{ + PrimaryHistoryQ: &history.Q{config.PrimaryDBSession}, + ReplicaHistoryQ: &history.Q{config.DBSession}, + ServerMetrics: serverMetrics, + } + r.Use(replicaSyncMiddleware.Wrap) + } + // Internal middlewares r.Internal.Use(chimiddleware.StripSlashes) r.Internal.Use(chimiddleware.RequestID) diff --git a/services/horizon/internal/httpx/server.go b/services/horizon/internal/httpx/server.go index bdcd4a696e..87073ff563 100644 --- a/services/horizon/internal/httpx/server.go +++ b/services/horizon/internal/httpx/server.go @@ -22,7 +22,8 @@ import ( ) type ServerMetrics struct { - RequestDurationSummary *prometheus.SummaryVec + RequestDurationSummary *prometheus.SummaryVec + ReplicaLagErrorsCounter prometheus.Counter } type TLSConfig struct { @@ -67,6 +68,12 @@ func NewServer(serverConfig ServerConfig, routerConfig RouterConfig, ledgerState }, []string{"status", "route", "streaming", "method"}, ), + ReplicaLagErrorsCounter: prometheus.NewCounter( + prometheus.CounterOpts{ + Namespace: "horizon", Subsystem: "http", Name: "replica_lag_errors_count", + Help: "Count of HTTP errors returned due to replica lag", + }, + ), } router, err := NewRouter(&routerConfig, sm, ledgerState) if err != nil { diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index db6a87573b..847651032b 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -43,11 +43,26 @@ func mustInitHorizonDB(app *App) { } } - app.historyQ = &history.Q{mustNewDBSession( - app.config.DatabaseURL, - maxIdle, - maxOpen, - )} + if app.config.RoDatabaseURL == "" { + app.historyQ = &history.Q{mustNewDBSession( + app.config.DatabaseURL, + maxIdle, + maxOpen, + )} + } else { + // If RO set, use it for all DB queries + app.historyQ = &history.Q{mustNewDBSession( + app.config.RoDatabaseURL, + maxIdle, + maxOpen, + )} + + app.primaryHistoryQ = &history.Q{mustNewDBSession( + app.config.DatabaseURL, + maxIdle, + maxOpen, + )} + } } func initIngester(app *App) { @@ -296,6 +311,7 @@ func initTxSubMetrics(app *App) { func initWebMetrics(app *App) { app.prometheusRegistry.MustRegister(app.webServer.Metrics.RequestDurationSummary) + app.prometheusRegistry.MustRegister(app.webServer.Metrics.ReplicaLagErrorsCounter) } func initSubmissionSystem(app *App) { diff --git a/services/horizon/internal/render/problem/problem.go b/services/horizon/internal/render/problem/problem.go index a6278c956f..f1b23b676e 100644 --- a/services/horizon/internal/render/problem/problem.go +++ b/services/horizon/internal/render/problem/problem.go @@ -100,8 +100,8 @@ var ( Status: http.StatusServiceUnavailable, Detail: "This horizon instance is configured to reject client requests " + "when it can determine that the history database is lagging too far " + - "behind the connected instance of stellar-core. If you operate this " + - "server, please ensure that the ingestion system is properly running.", + "behind the connected instance of stellar-core or read replica. Please " + + "try again later.", } // StillIngesting is a well-known problem type. Use it as a shortcut