Skip to content

Commit

Permalink
graphql, node, rpc: improve HTTP write timeout handling (ethereum#25457)
Browse files Browse the repository at this point in the history
Here we add special handling for sending an error response when the write timeout of the
HTTP server is just about to expire. This is surprisingly difficult to get right, since is
must be ensured that all output is fully flushed in time, which needs support from
multiple levels of the RPC handler stack:

The timeout response can't use chunked transfer-encoding because there is no way to write
the final terminating chunk. net/http writes it when the topmost handler returns, but the
timeout will already be over by the time that happens. We decided to disable chunked
encoding by setting content-length explicitly.

Gzip compression must also be disabled for timeout responses because we don't know the
true content-length before compressing all output, i.e. compression would reintroduce
chunked transfer-encoding.
  • Loading branch information
s1na authored and shekhirin committed Jun 6, 2023
1 parent 0802427 commit 7c621f5
Show file tree
Hide file tree
Showing 16 changed files with 606 additions and 80 deletions.
9 changes: 5 additions & 4 deletions graphql/graphql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,11 @@ func TestGraphQLTransactionLogs(t *testing.T) {

func createNode(t *testing.T) *node.Node {
stack, err := node.New(&node.Config{
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
HTTPHost: "127.0.0.1",
HTTPPort: 0,
WSHost: "127.0.0.1",
WSPort: 0,
HTTPTimeouts: node.DefaultConfig.HTTPTimeouts,
})
if err != nil {
t.Fatalf("could not create node: %v", err)
Expand Down
65 changes: 54 additions & 11 deletions graphql/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,16 @@ import (
"context"
"encoding/json"
"net/http"
"strconv"
"sync"
"time"

"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"github.com/graph-gophers/graphql-go"
gqlErrors "github.com/graph-gophers/graphql-go/errors"
)

type handler struct {
Expand All @@ -43,21 +47,60 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}

ctx, cancel := context.WithTimeout(r.Context(), 60*time.Second)
var (
ctx = r.Context()
responded sync.Once
timer *time.Timer
cancel context.CancelFunc
)
ctx, cancel = context.WithCancel(ctx)
defer cancel()

response := h.Schema.Exec(ctx, params.Query, params.OperationName, params.Variables)
responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if len(response.Errors) > 0 {
w.WriteHeader(http.StatusBadRequest)
if timeout, ok := rpc.ContextRequestTimeout(ctx); ok {
timer = time.AfterFunc(timeout, func() {
responded.Do(func() {
// Cancel request handling.
cancel()

// Create the timeout response.
response := &graphql.Response{
Errors: []*gqlErrors.QueryError{{Message: "request timed out"}},
}
responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

// Setting this disables gzip compression in package node.
w.Header().Set("transfer-encoding", "identity")

// Flush the response. Since we are writing close to the response timeout,
// chunked transfer encoding must be disabled by setting content-length.
w.Header().Set("content-type", "application/json")
w.Header().Set("content-length", strconv.Itoa(len(responseJSON)))
w.Write(responseJSON)
if flush, ok := w.(http.Flusher); ok {
flush.Flush()
}
})
})
}

w.Header().Set("Content-Type", "application/json")
w.Write(responseJSON)
response := h.Schema.Exec(ctx, params.Query, params.OperationName, params.Variables)
timer.Stop()
responded.Do(func() {
responseJSON, err := json.Marshal(response)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if len(response.Errors) > 0 {
w.WriteHeader(http.StatusBadRequest)
}
w.Header().Set("Content-Type", "application/json")
w.Write(responseJSON)
})
}

// New constructs a new GraphQL service instance.
Expand Down
3 changes: 3 additions & 0 deletions node/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ func TestStartRPC(t *testing.T) {
config := test.cfg
// config.Logger = testlog.Logger(t, log.LvlDebug)
config.P2P.NoDiscovery = true
if config.HTTPTimeouts == (rpc.HTTPTimeouts{}) {
config.HTTPTimeouts = rpc.DefaultHTTPTimeouts
}

// Create Node.
stack, err := New(&config)
Expand Down
13 changes: 7 additions & 6 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,13 @@ func (test rpcPrefixTest) check(t *testing.T, node *Node) {
}

for _, path := range test.wantHTTP {
resp := rpcRequest(t, httpBase+path)
resp := rpcRequest(t, httpBase+path, testMethod)
if resp.StatusCode != 200 {
t.Errorf("Error: %s: bad status code %d, want 200", path, resp.StatusCode)
}
}
for _, path := range test.wantNoHTTP {
resp := rpcRequest(t, httpBase+path)
resp := rpcRequest(t, httpBase+path, testMethod)
if resp.StatusCode != 404 {
t.Errorf("Error: %s: bad status code %d, want 404", path, resp.StatusCode)
}
Expand All @@ -586,10 +586,11 @@ func (test rpcPrefixTest) check(t *testing.T, node *Node) {

func createNode(t *testing.T, httpPort, wsPort int) *Node {
conf := &Config{
HTTPHost: "127.0.0.1",
HTTPPort: httpPort,
WSHost: "127.0.0.1",
WSPort: wsPort,
HTTPHost: "127.0.0.1",
HTTPPort: httpPort,
WSHost: "127.0.0.1",
WSPort: wsPort,
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
}
node, err := New(conf)
if err != nil {
Expand Down
100 changes: 87 additions & 13 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -196,6 +197,7 @@ func (h *httpServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
return
}

// if http-rpc is enabled, try to serve request
rpc := h.httpHandler.Load().(*rpcHandler)
if rpc != nil {
Expand Down Expand Up @@ -462,17 +464,94 @@ var gzPool = sync.Pool{
}

type gzipResponseWriter struct {
io.Writer
http.ResponseWriter
resp http.ResponseWriter

gz *gzip.Writer
contentLength uint64 // total length of the uncompressed response
written uint64 // amount of written bytes from the uncompressed response
hasLength bool // true if uncompressed response had Content-Length
inited bool // true after init was called for the first time
}

// init runs just before response headers are written. Among other things, this function
// also decides whether compression will be applied at all.
func (w *gzipResponseWriter) init() {
if w.inited {
return
}
w.inited = true

hdr := w.resp.Header()
length := hdr.Get("content-length")
if len(length) > 0 {
if n, err := strconv.ParseUint(length, 10, 64); err != nil {
w.hasLength = true
w.contentLength = n
}
}

// Setting Transfer-Encoding to "identity" explicitly disables compression. net/http
// also recognizes this header value and uses it to disable "chunked" transfer
// encoding, trimming the header from the response. This means downstream handlers can
// set this without harm, even if they aren't wrapped by newGzipHandler.
//
// In go-ethereum, we use this signal to disable compression for certain error
// responses which are flushed out close to the write deadline of the response. For
// these cases, we want to avoid chunked transfer encoding and compression because
// they require additional output that may not get written in time.
passthrough := hdr.Get("transfer-encoding") == "identity"
if !passthrough {
w.gz = gzPool.Get().(*gzip.Writer)
w.gz.Reset(w.resp)
hdr.Del("content-length")
hdr.Set("content-encoding", "gzip")
}
}

func (w *gzipResponseWriter) Header() http.Header {
return w.resp.Header()
}

func (w *gzipResponseWriter) WriteHeader(status int) {
w.Header().Del("Content-Length")
w.ResponseWriter.WriteHeader(status)
w.init()
w.resp.WriteHeader(status)
}

func (w *gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
w.init()

if w.gz == nil {
// Compression is disabled.
return w.resp.Write(b)
}

n, err := w.gz.Write(b)
w.written += uint64(n)
if w.hasLength && w.written >= w.contentLength {
// The HTTP handler has finished writing the entire uncompressed response. Close
// the gzip stream to ensure the footer will be seen by the client in case the
// response is flushed after this call to write.
err = w.gz.Close()
}
return n, err
}

func (w *gzipResponseWriter) Flush() {
if w.gz != nil {
w.gz.Flush()
}
if f, ok := w.resp.(http.Flusher); ok {
f.Flush()
}
}

func (w *gzipResponseWriter) close() {
if w.gz == nil {
return
}
w.gz.Close()
gzPool.Put(w.gz)
w.gz = nil
}

func newGzipHandler(next http.Handler) http.Handler {
Expand All @@ -482,15 +561,10 @@ func newGzipHandler(next http.Handler) http.Handler {
return
}

w.Header().Set("Content-Encoding", "gzip")

gz := gzPool.Get().(*gzip.Writer)
defer gzPool.Put(gz)

gz.Reset(w)
defer gz.Close()
wrapper := &gzipResponseWriter{resp: w}
defer wrapper.close()

next.ServeHTTP(&gzipResponseWriter{ResponseWriter: w, Writer: gz}, r)
next.ServeHTTP(wrapper, r)
})
}

Expand Down
Loading

0 comments on commit 7c621f5

Please sign in to comment.