-
Notifications
You must be signed in to change notification settings - Fork 501
/
middleware.go
398 lines (349 loc) · 12.1 KB
/
middleware.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
package horizon
import (
"context"
"database/sql"
"net/http"
"strings"
"time"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
"github.com/stellar/go/services/horizon/internal/actions"
horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/errors"
"github.com/stellar/go/services/horizon/internal/expingest"
"github.com/stellar/go/services/horizon/internal/hchi"
"github.com/stellar/go/services/horizon/internal/httpx"
"github.com/stellar/go/services/horizon/internal/ledger"
"github.com/stellar/go/services/horizon/internal/render"
hProblem "github.com/stellar/go/services/horizon/internal/render/problem"
"github.com/stellar/go/support/db"
supportErrors "github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/support/render/problem"
)
// appContextMiddleware adds the "app" context into every request, so that subsequence appContextMiddleware
// or handlers can retrieve a horizon.App instance
func appContextMiddleware(app *App) func(http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := withAppContext(r.Context(), app)
h.ServeHTTP(w, r.WithContext(ctx))
})
}
}
// requestCacheHeadersMiddleware adds caching headers to each response.
func requestCacheHeadersMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Before changing this read Stack Overflow answer about staled request
// in older versions of Chrome:
// https://stackoverflow.com/questions/27513994/chrome-stalls-when-making-multiple-requests-to-same-resource
w.Header().Set("Cache-Control", "no-cache, no-store, max-age=0")
h.ServeHTTP(w, r)
})
}
func contextMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx = hchi.WithChiRequestID(ctx)
ctx = httpx.RequestContext(ctx, w, r)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
const (
clientNameHeader = "X-Client-Name"
clientVersionHeader = "X-Client-Version"
appNameHeader = "X-App-Name"
appVersionHeader = "X-App-Version"
)
func newWrapResponseWriter(w http.ResponseWriter, r *http.Request) middleware.WrapResponseWriter {
mw, ok := w.(middleware.WrapResponseWriter)
if !ok {
mw = middleware.NewWrapResponseWriter(w, r.ProtoMajor)
}
return mw
}
// loggerMiddleware logs http requests and resposnes to the logging subsytem of horizon.
func loggerMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
mw := newWrapResponseWriter(w, r)
logger := log.WithField("req", middleware.GetReqID(ctx))
ctx = log.Set(ctx, logger)
// Checking `Accept` header from user request because if the streaming connection
// is reset before sending the first event no Content-Type header is sent in a response.
acceptHeader := r.Header.Get("Accept")
streaming := strings.Contains(acceptHeader, render.MimeEventStream)
logStartOfRequest(ctx, r, streaming)
then := time.Now()
h.ServeHTTP(mw, r.WithContext(ctx))
duration := time.Since(then)
logEndOfRequest(ctx, r, duration, mw, streaming)
})
}
// timeoutMiddleware ensures the request is terminated after the given timeout
func timeoutMiddleware(timeout time.Duration) func(next http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
mw := newWrapResponseWriter(w, r)
ctx, cancel := context.WithTimeout(r.Context(), timeout)
defer func() {
cancel()
if ctx.Err() == context.DeadlineExceeded {
if mw.Status() == 0 {
// only write the header if it hasn't been written yet
mw.WriteHeader(http.StatusGatewayTimeout)
}
}
}()
// txsub has a custom timeout
if r.Method != http.MethodPost {
r = r.WithContext(ctx)
}
next.ServeHTTP(mw, r)
}
return http.HandlerFunc(fn)
}
}
// getClientData gets client data (name or version) from header or GET parameter
// (useful when not possible to set headers, like in EventStream).
func getClientData(r *http.Request, headerName string) string {
value := r.Header.Get(headerName)
if value != "" {
return value
}
value = r.URL.Query().Get(headerName)
if value == "" {
value = "undefined"
}
return value
}
func logStartOfRequest(ctx context.Context, r *http.Request, streaming bool) {
referer := r.Referer()
if referer == "" {
referer = "undefined"
}
log.Ctx(ctx).WithFields(log.F{
"client_name": getClientData(r, clientNameHeader),
"client_version": getClientData(r, clientVersionHeader),
"app_name": getClientData(r, appNameHeader),
"app_version": getClientData(r, appVersionHeader),
"forwarded_ip": firstXForwardedFor(r),
"host": r.Host,
"ip": remoteAddrIP(r),
"ip_port": r.RemoteAddr,
"method": r.Method,
"path": r.URL.String(),
"streaming": streaming,
"referer": referer,
}).Info("Starting request")
}
func logEndOfRequest(ctx context.Context, r *http.Request, duration time.Duration, mw middleware.WrapResponseWriter, streaming bool) {
routePattern := chi.RouteContext(r.Context()).RoutePattern()
// Can be empty when request did not reached the final route (ex. blocked by
// a middleware). More info: https://github.com/go-chi/chi/issues/270
if routePattern == "" {
routePattern = "undefined"
}
referer := r.Referer()
if referer == "" {
referer = "undefined"
}
log.Ctx(ctx).WithFields(log.F{
"bytes": mw.BytesWritten(),
"client_name": getClientData(r, clientNameHeader),
"client_version": getClientData(r, clientVersionHeader),
"app_name": getClientData(r, appNameHeader),
"app_version": getClientData(r, appVersionHeader),
"duration": duration.Seconds(),
"forwarded_ip": firstXForwardedFor(r),
"host": r.Host,
"ip": remoteAddrIP(r),
"ip_port": r.RemoteAddr,
"method": r.Method,
"path": r.URL.String(),
"route": routePattern,
"status": mw.Status(),
"streaming": streaming,
"referer": referer,
}).Info("Finished request")
}
func firstXForwardedFor(r *http.Request) string {
return strings.TrimSpace(strings.SplitN(r.Header.Get("X-Forwarded-For"), ",", 2)[0])
}
func (w *web) RateLimitMiddleware(next http.Handler) http.Handler {
if w.rateLimiter == nil {
return next
}
return w.rateLimiter.RateLimit(next)
}
// recoverMiddleware helps the server recover from panics. It ensures that
// no request can fully bring down the horizon server, and it also logs the
// panics to the logging subsystem.
func recoverMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
defer func() {
if rec := recover(); rec != nil {
err := errors.FromPanic(rec)
errors.ReportToSentry(err, r)
problem.Render(ctx, w, err)
}
}()
h.ServeHTTP(w, r)
})
}
// requestMetricsMiddleware records success and failures using a meter, and times every request
func requestMetricsMiddleware(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
app := AppFromContext(r.Context())
mw := newWrapResponseWriter(w, r)
app.web.requestTimer.Time(func() {
h.ServeHTTP(mw.(http.ResponseWriter), r)
})
if 200 <= mw.Status() && mw.Status() < 400 {
// a success is in [200, 400)
app.web.successMeter.Mark(1)
} else if 400 <= mw.Status() && mw.Status() < 600 {
// a success is in [400, 600)
app.web.failureMeter.Mark(1)
}
})
}
// NewHistoryMiddleware adds session to the request context and ensures Horizon
// is not in a stale state, which is when the difference between latest core
// ledger and latest history ledger is higher than the given threshold
func NewHistoryMiddleware(staleThreshold int32, session *db.Session) func(http.Handler) http.Handler {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if staleThreshold > 0 {
ls := ledger.CurrentState()
isStale := (ls.CoreLatest - ls.HistoryLatest) > int32(staleThreshold)
if isStale {
err := hProblem.StaleHistory
err.Extras = map[string]interface{}{
"history_latest_ledger": ls.HistoryLatest,
"core_latest_ledger": ls.CoreLatest,
}
problem.Render(r.Context(), w, err)
return
}
}
requestSession := session.Clone()
requestSession.Ctx = r.Context()
h.ServeHTTP(w, r.WithContext(
context.WithValue(
r.Context(),
&horizonContext.SessionContextKey,
requestSession,
),
))
})
}
}
// StateMiddleware is a middleware which enables a state handler if the state
// has been initialized.
// Unless NoStateVerification is set, it ensures that the state (ledger entries)
// has been verified and is correct (Otherwise returns `500 Internal Server Error` to prevent
// returning invalid data to the user)
type StateMiddleware struct {
HorizonSession *db.Session
NoStateVerification bool
}
func ingestionStatus(q *history.Q) (uint32, bool, error) {
version, err := q.GetExpIngestVersion()
if err != nil {
return 0, false, supportErrors.Wrap(
err, "Error running GetExpIngestVersion",
)
}
lastIngestedLedger, err := q.GetLastLedgerExpIngestNonBlocking()
if err != nil {
return 0, false, supportErrors.Wrap(
err, "Error running GetLastLedgerExpIngestNonBlocking",
)
}
var lastHistoryLedger uint32
err = q.LatestLedger(&lastHistoryLedger)
if err != nil {
return 0, false, supportErrors.Wrap(err, "Error running LatestLedger")
}
ready := version == expingest.CurrentVersion &&
lastIngestedLedger > 0 &&
lastIngestedLedger == lastHistoryLedger
return lastIngestedLedger, ready, nil
}
// WrapFunc executes the middleware on a given HTTP handler function
func (m *StateMiddleware) WrapFunc(h http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
session := m.HorizonSession.Clone()
q := &history.Q{session}
sseRequest := render.Negotiate(r) == render.MimeEventStream
// We want to start a repeatable read session to ensure that the data we
// fetch from the db belong to the same ledger.
// Otherwise, because the ingestion system is running concurrently with this request,
// it is possible to have one read fetch data from ledger N and another read
// fetch data from ledger N+1 .
session.Ctx = r.Context()
err := session.BeginTx(&sql.TxOptions{
Isolation: sql.LevelRepeatableRead,
ReadOnly: true,
})
if err != nil {
err = supportErrors.Wrap(err, "Error starting exp ingestion read transaction")
problem.Render(r.Context(), w, err)
return
}
defer session.Rollback()
if !m.NoStateVerification {
stateInvalid, invalidErr := q.GetExpStateInvalid()
if invalidErr != nil {
invalidErr = supportErrors.Wrap(invalidErr, "Error running GetExpStateInvalid")
problem.Render(r.Context(), w, invalidErr)
return
}
if stateInvalid {
problem.Render(r.Context(), w, problem.ServerError)
return
}
}
lastIngestedLedger, ready, err := ingestionStatus(q)
if err != nil {
problem.Render(r.Context(), w, err)
return
}
if !m.NoStateVerification && !ready {
problem.Render(r.Context(), w, hProblem.StillIngesting)
return
}
// for SSE requests we need to discard the repeatable read transaction
// otherwise, the stream will not pick up updates occurring in future
// ledgers
if sseRequest {
if err = session.Rollback(); err != nil {
problem.Render(
r.Context(),
w,
supportErrors.Wrap(
err,
"Could not roll back repeatable read session for SSE request",
),
)
return
}
} else {
actions.SetLastLedgerHeader(w, lastIngestedLedger)
}
h.ServeHTTP(w, r.WithContext(
context.WithValue(
r.Context(),
&horizonContext.SessionContextKey,
session,
),
))
}
}
// WrapFunc executes the middleware on a given HTTP handler function
func (m *StateMiddleware) Wrap(h http.Handler) http.Handler {
return m.WrapFunc(h.ServeHTTP)
}