Skip to content

Commit

Permalink
ReplicaSyncMiddleware
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn committed May 5, 2021
1 parent 7d7e580 commit 4f8deb9
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 11 deletions.
5 changes: 5 additions & 0 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type App struct {
config Config
webServer *httpx.Server
historyQ *history.Q
primaryHistoryQ *history.Q
ctx context.Context
cancel func()
horizonVersion string
Expand Down Expand Up @@ -502,6 +503,10 @@ func (a *App) init() error {
},
}

if a.primaryHistoryQ != nil {
routerConfig.PrimaryDBSession = a.primaryHistoryQ.Session
}

var err error
config := httpx.ServerConfig{
Port: uint16(a.config.Port),
Expand Down
1 change: 1 addition & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
// app's main function and is provided to NewApp.
type Config struct {
DatabaseURL string
RoDatabaseURL string
HistoryArchiveURLs []string
Port uint
AdminPort uint
Expand Down
7 changes: 7 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ func Flags() (*Config, support.ConfigOptions) {
Required: true,
Usage: "horizon postgres database to connect with",
},
&support.ConfigOption{
Name: "ro-database-url",
ConfigKey: &config.RoDatabaseURL,
OptType: types.String,
Required: false,
Usage: "horizon postgres read-replica to connect with, when set it will return stale history error when replica is behind primary",
},
&support.ConfigOption{
Name: StellarCoreBinaryPathName,
OptType: types.String,
Expand Down
48 changes: 48 additions & 0 deletions services/horizon/internal/httpx/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,3 +354,51 @@ func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
func (m *StateMiddleware) Wrap(h http.Handler) http.Handler {
return m.WrapFunc(h.ServeHTTP)
}

type ReplicaSyncCheckMiddleware struct {
PrimaryHistoryQ *history.Q
ReplicaHistoryQ *history.Q
ServerMetrics *ServerMetrics
}

// WrapFunc executes the middleware on a given HTTP handler function
func (m *ReplicaSyncCheckMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
for attempt := 1; attempt <= 4; attempt++ {
primaryIngestLedger, err := m.PrimaryHistoryQ.GetLastLedgerIngestNonBlocking()
if err != nil {
problem.Render(r.Context(), w, err)
return
}

replicaIngestLedger, err := m.ReplicaHistoryQ.GetLastLedgerIngestNonBlocking()
if err != nil {
problem.Render(r.Context(), w, err)
return
}

if replicaIngestLedger >= primaryIngestLedger {
break
}

switch attempt {
case 1:
time.Sleep(20 * time.Millisecond)
case 2:
time.Sleep(40 * time.Millisecond)
case 3:
time.Sleep(80 * time.Millisecond)
case 4:
problem.Render(r.Context(), w, hProblem.StaleHistory)
m.ServerMetrics.ReplicaLagErrorsCounter.Inc()
return
}
}

h.ServeHTTP(w, r)
}
}

func (m *ReplicaSyncCheckMiddleware) Wrap(h http.Handler) http.Handler {
return m.WrapFunc(h.ServeHTTP)
}
17 changes: 14 additions & 3 deletions services/horizon/internal/httpx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/stellar/throttled"

"github.com/stellar/go/services/horizon/internal/actions"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/paths"
"github.com/stellar/go/services/horizon/internal/render/sse"
Expand All @@ -28,9 +29,10 @@ import (
const maxAssetsForPathFinding = 15

type RouterConfig struct {
DBSession *db.Session
TxSubmitter *txsub.System
RateQuota *throttled.RateQuota
DBSession *db.Session
PrimaryDBSession *db.Session
TxSubmitter *txsub.System
RateQuota *throttled.RateQuota

BehindCloudflare bool
BehindAWSLoadBalancer bool
Expand Down Expand Up @@ -99,6 +101,15 @@ func (r *Router) addMiddleware(config *RouterConfig,
r.Use(rateLimitter.RateLimit)
}

if config.PrimaryDBSession != nil {
replicaSyncMiddleware := ReplicaSyncCheckMiddleware{
PrimaryHistoryQ: &history.Q{config.PrimaryDBSession},
ReplicaHistoryQ: &history.Q{config.DBSession},
ServerMetrics: serverMetrics,
}
r.Use(replicaSyncMiddleware.Wrap)
}

// Internal middlewares
r.Internal.Use(chimiddleware.StripSlashes)
r.Internal.Use(chimiddleware.RequestID)
Expand Down
9 changes: 8 additions & 1 deletion services/horizon/internal/httpx/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
)

type ServerMetrics struct {
RequestDurationSummary *prometheus.SummaryVec
RequestDurationSummary *prometheus.SummaryVec
ReplicaLagErrorsCounter prometheus.Counter
}

type TLSConfig struct {
Expand Down Expand Up @@ -67,6 +68,12 @@ func NewServer(serverConfig ServerConfig, routerConfig RouterConfig, ledgerState
},
[]string{"status", "route", "streaming", "method"},
),
ReplicaLagErrorsCounter: prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "horizon", Subsystem: "http", Name: "replica_lag_errors_count",
Help: "Count of HTTP errors returned due to replica lag",
},
),
}
router, err := NewRouter(&routerConfig, sm, ledgerState)
if err != nil {
Expand Down
26 changes: 21 additions & 5 deletions services/horizon/internal/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,26 @@ func mustInitHorizonDB(app *App) {
}
}

app.historyQ = &history.Q{mustNewDBSession(
app.config.DatabaseURL,
maxIdle,
maxOpen,
)}
if app.config.RoDatabaseURL == "" {
app.historyQ = &history.Q{mustNewDBSession(
app.config.DatabaseURL,
maxIdle,
maxOpen,
)}
} else {
// If RO set, use it for all DB queries
app.historyQ = &history.Q{mustNewDBSession(
app.config.RoDatabaseURL,
maxIdle,
maxOpen,
)}

app.primaryHistoryQ = &history.Q{mustNewDBSession(
app.config.DatabaseURL,
maxIdle,
maxOpen,
)}
}
}

func initIngester(app *App) {
Expand Down Expand Up @@ -296,6 +311,7 @@ func initTxSubMetrics(app *App) {

func initWebMetrics(app *App) {
app.prometheusRegistry.MustRegister(app.webServer.Metrics.RequestDurationSummary)
app.prometheusRegistry.MustRegister(app.webServer.Metrics.ReplicaLagErrorsCounter)
}

func initSubmissionSystem(app *App) {
Expand Down
4 changes: 2 additions & 2 deletions services/horizon/internal/render/problem/problem.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ var (
Status: http.StatusServiceUnavailable,
Detail: "This horizon instance is configured to reject client requests " +
"when it can determine that the history database is lagging too far " +
"behind the connected instance of stellar-core. If you operate this " +
"server, please ensure that the ingestion system is properly running.",
"behind the connected instance of stellar-core or read replica. Please " +
"try again later.",
}

// StillIngesting is a well-known problem type. Use it as a shortcut
Expand Down

0 comments on commit 4f8deb9

Please sign in to comment.