Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improved logging - include node url, allow X-Request-ID header(s) #15

Merged
merged 8 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jobs:
- name: Lint
run: make lint

- name: Build
run: make build build-tee

- name: Ensure go mod tidy runs without changes
run: |
go mod tidy
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ lint:
gofmt -d -s .
gofumpt -d -extra .
go vet ./...
go vet --tags=tee ./...
staticcheck ./...
# golangci-lint run

lt: lint test

lint-strict: lint
gofumpt -d -extra .
golangci-lint run
Expand Down
4 changes: 2 additions & 2 deletions server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func (n *Node) startProxyWorker(id int32, cancelContext context.Context) {
payload, statusCode, err := n.ProxyRequest(req.Payload, ProxyRequestTimeout)
if err != nil {
n.log.Errorw("node proxyRequest error", "uri", n.URI, "error", err)
response := SimResponse{StatusCode: statusCode, Payload: payload, Error: err, ShouldRetry: true}
response := SimResponse{StatusCode: statusCode, Payload: payload, Error: err, ShouldRetry: true, NodeURI: n.URI}
req.SendResponse(response)
continue
}

// Send response
sent := req.SendResponse(SimResponse{Payload: payload})
sent := req.SendResponse(SimResponse{Payload: payload, NodeURI: n.URI})
if !sent {
n.log.Errorw("couldn't send node response to client (SendResponse returned false)", "secSinceRequestCreated", time.Since(req.CreatedAt).Seconds())
}
Expand Down
4 changes: 2 additions & 2 deletions server/node_notee.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
)

func NewNode(log *zap.SugaredLogger, uri string, jobC chan *SimRequest, numWorkers int32) (*Node, error) {
url, err := url.ParseRequestURI(uri)
pURL, err := url.ParseRequestURI(uri)
if err != nil {
return nil, err
}

workersArg := url.Query().Get("_workers")
workersArg := pURL.Query().Get("_workers")
if workersArg != "" {
// set numWorkers from query param
workersInt, err := strconv.Atoi(workersArg)
Expand Down
3 changes: 2 additions & 1 deletion server/node_tee.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -51,7 +52,7 @@ func NewNode(log *zap.SugaredLogger, uri string, jobC chan *SimRequest, numWorke
}
username := pURL.User.Username()

workersArg := url.Query().Get("_workers")
workersArg := pURL.Query().Get("_workers")
if workersArg != "" {
// set numWorkers from query param
workersInt, err := strconv.Atoi(workersArg)
Expand Down
1 change: 1 addition & 0 deletions server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,5 @@ type SimResponse struct {
Payload []byte
Error error
ShouldRetry bool // When response has an error, whether it should be retried
NodeURI string
}
43 changes: 37 additions & 6 deletions server/webserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,23 @@ func (s *Webserver) HandleRootRequest(w http.ResponseWriter, req *http.Request)
func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request) {
startTime := time.Now().UTC()
defer req.Body.Close()

// Allow single `X-Request-ID:...` log field via header
reqID := req.Header.Get("X-Request-ID")
log := s.log
if reqID != "" {
log = s.log.With("reqID", reqID)
}

// Allow multiple reqID log fields through `X-Request-ID/ABC:...` headers
for k := range req.Header {
if strings.HasPrefix(k, "X-Request-ID/") {
idTag := "reqID/" + strings.TrimPrefix(k, "X-Request-ID/")
log = log.With(idTag, req.Header.Get(k))
}
}

// Read the body and start processing
body, err := io.ReadAll(req.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand All @@ -76,7 +93,7 @@ func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request)

ctx := req.Context()
if ctx.Err() != nil {
s.log.Infow("client closed the connection before processing", "err", ctx.Err())
log.Infow("client closed the connection before processing", "err", ctx.Err())
return
}

Expand All @@ -86,26 +103,26 @@ func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request)
simReq := NewSimRequest(body, isHighPrio, isFastTrack)
wasAdded := s.prioQueue.Push(simReq)
if !wasAdded { // queue was full, job not added
s.log.Error("Couldn't add request, queue is full")
log.Error("Couldn't add request, queue is full")
http.Error(w, "queue full", http.StatusInternalServerError)
return
}

lenFastTrack, lenHighPrio, lenLowPrio := s.prioQueue.Len()
s.log.Infow("Request added to queue. prioQueue size:", "requestIsHighPrio", isHighPrio, "requestIsFastTrack", isFastTrack, "fastTrack", lenFastTrack, "highPrio", lenHighPrio, "lowPrio", lenLowPrio)
log.Infow("Request added to queue. prioQueue size:", "requestIsHighPrio", isHighPrio, "requestIsFastTrack", isFastTrack, "fastTrack", lenFastTrack, "highPrio", lenHighPrio, "lowPrio", lenLowPrio)

// Wait for response or cancel
for {
select {
case <-ctx.Done(): // if user closes connection, cancel the simreq
s.log.Infow("client closed the connection prematurely", "err", ctx.Err(), "queueItems", s.prioQueue.NumRequests(), "payloadSize", len(body), "requestTries", simReq.Tries, "requestCancelled", simReq.Cancelled)
log.Infow("client closed the connection prematurely", "err", ctx.Err(), "queueItems", s.prioQueue.NumRequests(), "payloadSize", len(body), "requestTries", simReq.Tries, "requestCancelled", simReq.Cancelled)
if ctx.Err() != nil {
simReq.Cancelled = true
}
return
case resp := <-simReq.ResponseC:
if resp.Error != nil {
s.log.Infow("HandleSim error", "err", resp.Error, "try", simReq.Tries, "shouldRetry", resp.ShouldRetry)
log.Infow("HandleSim error", "err", resp.Error, "try", simReq.Tries, "shouldRetry", resp.ShouldRetry, "nodeURI", resp.NodeURI)
if simReq.Tries < RequestMaxTries && resp.ShouldRetry {
s.prioQueue.Push(simReq)
continue
Expand All @@ -132,7 +149,21 @@ func (s *Webserver) HandleQueueRequest(w http.ResponseWriter, req *http.Request)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
w.Write(resp.Payload)
s.log.Infow("Request completed", "durationMs", time.Since(startTime).Milliseconds(), "requestIsHighPrio", isHighPrio, "requestIsFastTrack", isFastTrack, "payloadSize", len(body))

lenFastTrack, lenHighPrio, lenLowPrio := s.prioQueue.Len()
log.Infow("Request completed",
"durationMs", time.Since(startTime).Milliseconds(),
"requestIsHighPrio", isHighPrio,
"requestIsFastTrack", isFastTrack,
"payloadSize", len(body),
"statusCode", resp.StatusCode,
"nodeURI", resp.NodeURI,
"requestTries", simReq.Tries,
"queueItems", s.prioQueue.NumRequests(),
"queueItemsFastTrack", lenFastTrack,
"queueItemsHighPrio", lenHighPrio,
"queueItemsLowPrio", lenLowPrio,
)
return
}
}
Expand Down