Skip to content

Commit

Permalink
Remove custom http handling code to support keep-alives
Browse files Browse the repository at this point in the history
  • Loading branch information
zquestz committed Jul 17, 2020
1 parent ef872e9 commit c196d09
Showing 1 changed file with 21 additions and 105 deletions.
126 changes: 21 additions & 105 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"math"
"math/big"
Expand Down Expand Up @@ -3997,61 +3996,6 @@ type rpcServer struct {
quit chan int
}

// httpStatusLine returns a response Status-Line (RFC 2616 Section 6.1)
// for the given request and response status code. This function was lifted and
// adapted from the standard library HTTP server code since it's not exported.
func (s *rpcServer) httpStatusLine(req *http.Request, code int) string {
// Fast path:
key := code
proto11 := req.ProtoAtLeast(1, 1)
if !proto11 {
key = -key
}
s.statusLock.RLock()
line, ok := s.statusLines[key]
s.statusLock.RUnlock()
if ok {
return line
}

// Slow path:
proto := "HTTP/1.0"
if proto11 {
proto = "HTTP/1.1"
}
codeStr := strconv.Itoa(code)
text := http.StatusText(code)
if text != "" {
line = proto + " " + codeStr + " " + text + "\r\n"
s.statusLock.Lock()
s.statusLines[key] = line
s.statusLock.Unlock()
} else {
text = "status code " + codeStr
line = proto + " " + codeStr + " " + text + "\r\n"
}

return line
}

// writeHTTPResponseHeaders writes the necessary response headers prior to
// writing an HTTP body given a request to use for protocol negotiation, headers
// to write, a status code, and a writer.
func (s *rpcServer) writeHTTPResponseHeaders(req *http.Request, headers http.Header, code int, w io.Writer) error {
_, err := io.WriteString(w, s.httpStatusLine(req, code))
if err != nil {
return err
}

err = headers.Write(w)
if err != nil {
return err
}

_, err = io.WriteString(w, "\r\n")
return err
}

// Stop is used by server.go to stop the rpc listener.
func (s *rpcServer) Stop() error {
if atomic.AddInt32(&s.shutdown, 1) != 1 {
Expand Down Expand Up @@ -4313,52 +4257,30 @@ func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin
return
}

// Setup a close notifier to stop any long polling routines.
closeChan := make(chan struct{}, 1)
notify := w.(http.CloseNotifier).CloseNotify()

go func() {
select {
case <-notify:
close(closeChan)
return
default:
}
}()

// Read and close the JSON-RPC request body from the caller.
body, err := ioutil.ReadAll(r.Body)
r.Body.Close()

if err != nil {
errCode := http.StatusBadRequest
http.Error(w, fmt.Sprintf("%d error reading JSON message: %v",
errCode, err), errCode)
return
}

// Unfortunately, the http server doesn't provide the ability to
// change the read deadline for the new connection and having one breaks
// long polling. However, not having a read deadline on the initial
// connection would mean clients can connect and idle forever. Thus,
// hijack the connecton from the HTTP server, clear the read deadline,
// and handle writing the response manually.
hj, ok := w.(http.Hijacker)
if !ok {
errMsg := "webserver doesn't support hijacking"
rpcsLog.Warnf(errMsg)
errCode := http.StatusInternalServerError
http.Error(w, strconv.Itoa(errCode)+" "+errMsg, errCode)
return
}

conn, buf, err := hj.Hijack()
if err != nil {
rpcsLog.Warnf("Failed to hijack HTTP connection: %v", err)
errCode := http.StatusInternalServerError
http.Error(w, strconv.Itoa(errCode)+" "+err.Error(), errCode)
return
}

defer conn.Close()
defer buf.Flush()
conn.SetReadDeadline(timeZeroVal)
// Setup a close notifier. Since the connection is hijacked,
// the CloseNotifer on the ResponseWriter is not available.
closeChan := make(chan struct{}, 1)
go func() {
_, err = conn.Read(make([]byte, 1))
if err != nil {
close(closeChan)
}
}()

var results []json.RawMessage
var batchSize int
var batchedRequest bool
Expand Down Expand Up @@ -4510,18 +4432,14 @@ func (s *rpcServer) jsonRPCRead(w http.ResponseWriter, r *http.Request, isAdmin
}
}

// Write the response.
err = s.writeHTTPResponseHeaders(r, w.Header(), http.StatusOK, buf)
if err != nil {
rpcsLog.Error(err)
return
}
if _, err := buf.Write(msg); err != nil {
w.Header().Set("Content-Length", strconv.Itoa(len(msg)+1))

if _, err := w.Write(msg); err != nil {
rpcsLog.Errorf("Failed to write marshalled reply: %v", err)
}

// Terminate with newline to maintain compatibility with Bitcoin Core.
if err := buf.WriteByte('\n'); err != nil {
if _, err := w.Write([]byte("\n")); err != nil {
rpcsLog.Errorf("Failed to append terminating newline to reply: %v", err)
}
}
Expand All @@ -4543,14 +4461,11 @@ func (s *rpcServer) Start() {
httpServer := &http.Server{
Handler: rpcServeMux,

// Timeout connections which don't complete the initial
// handshake within the allowed timeframe.
ReadTimeout: time.Second * rpcAuthTimeoutSeconds,
// Allow long polling.
ReadTimeout: 0,
}
rpcServeMux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "close")
w.Header().Set("Content-Type", "application/json")
r.Close = true

// Limit the number of connections to max allowed.
if s.limitConnections(w, r.RemoteAddr) {
Expand All @@ -4560,6 +4475,7 @@ func (s *rpcServer) Start() {
// Keep track of the number of connected clients.
s.incrementClients()
defer s.decrementClients()

_, isAdmin, err := s.checkAuth(r, true)
if err != nil {
jsonAuthFail(w)
Expand Down

0 comments on commit c196d09

Please sign in to comment.