Skip to content

Commit

Permalink
bugfix: data race in pkg cri/stream
Browse files Browse the repository at this point in the history
Signed-off-by: YaoZengzeng <[email protected]>
  • Loading branch information
YaoZengzeng committed Jul 31, 2018
1 parent 7df653e commit 7882693
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions cri/stream/portforward/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func handleHTTPStreams(ctx context.Context, w http.ResponseWriter, req *http.Req
}
defer conn.Close()

logrus.Infof("(conn=%p) setting forwarding streaming connection idle timeout to %v", conn, idleTimeout)
logrus.Infof("Setting forwarding streaming connection idle timeout to %v", idleTimeout)
conn.SetIdleTimeout(idleTimeout)

h := &httpStreamHandler{
Expand Down Expand Up @@ -99,11 +99,11 @@ type httpStreamHandler struct {
func (h *httpStreamHandler) getStreamPair(requestID string) (*httpStreamPair, bool) {
p, ok := h.streamPairs.Get(requestID).Result()
if ok {
logrus.Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID)
logrus.Infof("PortForward of CRI: found existing stream pair for request %s", requestID)
return p.(*httpStreamPair), false
}

logrus.Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID)
logrus.Infof("PortForward of CRI: creating new stream pair for request %s", requestID)

pair := newPortForwardPair(requestID)
h.streamPairs.Put(requestID, pair)
Expand All @@ -129,10 +129,10 @@ func (h *httpStreamHandler) removeStreamPair(requestID string) {
func (h *httpStreamHandler) monitorStreamPair(p *httpStreamPair, timeout <-chan time.Time) {
select {
case <-timeout:
msg := fmt.Sprintf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID)
msg := fmt.Sprintf("PortForward of CRI: timed out waiting for streams of request %s", p.requestID)
p.printError(msg)
case <-p.complete:
logrus.Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID)
logrus.Infof("PortForward of CRI: successfully received error and data streams of request %s", p.requestID)
}
h.removeStreamPair(p.requestID)
}
Expand All @@ -152,24 +152,24 @@ func (h *httpStreamHandler) requestID(stream httpstream.Stream) string {
// invoking portForward for each complete stream pair. The loop exits
// when the httpstream.Connection is closed.
func (h *httpStreamHandler) run(ctx context.Context) {
logrus.Infof("(conn=%p) waiting for port forward streams", h.conn)
logrus.Infof("PortForward of CRI: waiting for streams")

for {
select {
case <-h.conn.CloseChan():
logrus.Infof("(conn=%p) upgraded connection closed", h.conn)
logrus.Infof("PortForward of CRI: upgraded connection closed")
return
case stream := <-h.streamChan:
requestID := h.requestID(stream)
streamType := stream.Headers().Get(constant.StreamType)
logrus.Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType)
logrus.Infof("PortForward of CRI: received new stream of type %s, request %s", streamType, requestID)

p, created := h.getStreamPair(requestID)
if created {
go h.monitorStreamPair(p, time.After(h.streamCreationTimeout))
}
if complete, err := p.add(stream); err != nil {
msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
msg := fmt.Sprintf("PortForward of CRI: error processing stream for request %s: %v", requestID, err)
p.printError(msg)
} else if complete {
go h.portForward(ctx, p)
Expand All @@ -187,12 +187,12 @@ func (h *httpStreamHandler) portForward(ctx context.Context, p *httpStreamPair)
portString := p.dataStream.Headers().Get(constant.PortHeader)
port, _ := strconv.ParseInt(portString, 10, 32)

logrus.Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
logrus.Infof("PortForward of CRI: invoking forwarder.PortForward for port %s of request %s", portString, p.requestID)
err := h.forwarder.PortForward(ctx, h.pod, int32(port), p.dataStream)
logrus.Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
logrus.Infof("PortForward of CRI: done invoking forwarder.PortForward for port %s", portString, p.requestID)

if err != nil {
msg := fmt.Sprintf("error forwarding port %d to pod %s: %v", port, h.pod, err)
msg := fmt.Sprintf("PortForward of CRI: error forwarding port %d to pod %s: %v", port, h.pod, err)
p.printError(msg)
}
}
Expand Down

0 comments on commit 7882693

Please sign in to comment.