Skip to content

Commit

Permalink
Merge pull request #1925 from YaoZengzeng/stream-race
Browse files Browse the repository at this point in the history
bugfix: data race in pkg cri/stream
  • Loading branch information
Wei Fu authored Aug 2, 2018
2 parents 6b31add + 1e1f3f2 commit a115eb8
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions cri/stream/portforward/httpstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ func handleHTTPStreams(ctx context.Context, w http.ResponseWriter, req *http.Req
}
streamChan := make(chan httpstream.Stream, 1)

logrus.Infof("Upgrading port forward response")
logrus.Infof("upgrading port forward response")
upgrader := spdy.NewResponseUpgrader()
conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan))
if conn == nil {
return fmt.Errorf("unable to upgrade connection")
}
defer conn.Close()

logrus.Infof("(conn=%p) setting forwarding streaming connection idle timeout to %v", conn, idleTimeout)
logrus.Infof("setting forwarding streaming connection idle timeout to %v", idleTimeout)
conn.SetIdleTimeout(idleTimeout)

h := &httpStreamHandler{
Expand Down Expand Up @@ -99,11 +99,11 @@ type httpStreamHandler struct {
func (h *httpStreamHandler) getStreamPair(requestID string) (*httpStreamPair, bool) {
p, ok := h.streamPairs.Get(requestID).Result()
if ok {
logrus.Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID)
logrus.Infof("portforward of cri: found existing stream pair for request %s", requestID)
return p.(*httpStreamPair), false
}

logrus.Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID)
logrus.Infof("portforward of cri: creating new stream pair for request %s", requestID)

pair := newPortForwardPair(requestID)
h.streamPairs.Put(requestID, pair)
Expand All @@ -129,10 +129,10 @@ func (h *httpStreamHandler) removeStreamPair(requestID string) {
func (h *httpStreamHandler) monitorStreamPair(p *httpStreamPair, timeout <-chan time.Time) {
select {
case <-timeout:
msg := fmt.Sprintf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID)
msg := fmt.Sprintf("portforward of cri: timed out waiting for streams of request %s", p.requestID)
p.printError(msg)
case <-p.complete:
logrus.Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID)
logrus.Infof("portforward of cri: successfully received error and data streams of request %s", p.requestID)
}
h.removeStreamPair(p.requestID)
}
Expand All @@ -152,24 +152,24 @@ func (h *httpStreamHandler) requestID(stream httpstream.Stream) string {
// invoking portForward for each complete stream pair. The loop exits
// when the httpstream.Connection is closed.
func (h *httpStreamHandler) run(ctx context.Context) {
logrus.Infof("(conn=%p) waiting for port forward streams", h.conn)
logrus.Infof("portforward of cri: waiting for streams")

for {
select {
case <-h.conn.CloseChan():
logrus.Infof("(conn=%p) upgraded connection closed", h.conn)
logrus.Infof("portforward of cri: upgraded connection closed")
return
case stream := <-h.streamChan:
requestID := h.requestID(stream)
streamType := stream.Headers().Get(constant.StreamType)
logrus.Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType)
logrus.Infof("portForward of cri: received new stream of type %s, request %s", streamType, requestID)

p, created := h.getStreamPair(requestID)
if created {
go h.monitorStreamPair(p, time.After(h.streamCreationTimeout))
}
if complete, err := p.add(stream); err != nil {
msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err)
msg := fmt.Sprintf("portforward of cri: error processing stream for request %s: %v", requestID, err)
p.printError(msg)
} else if complete {
go h.portForward(ctx, p)
Expand All @@ -187,12 +187,12 @@ func (h *httpStreamHandler) portForward(ctx context.Context, p *httpStreamPair)
portString := p.dataStream.Headers().Get(constant.PortHeader)
port, _ := strconv.ParseInt(portString, 10, 32)

logrus.Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
logrus.Infof("portforward of cri: invoking forwarder.PortForward for port %s of request %s", portString, p.requestID)
err := h.forwarder.PortForward(ctx, h.pod, int32(port), p.dataStream)
logrus.Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString)
logrus.Infof("portforward of cri: done invoking forwarder.PortForward for port %s", portString, p.requestID)

if err != nil {
msg := fmt.Sprintf("error forwarding port %d to pod %s: %v", port, h.pod, err)
msg := fmt.Sprintf("portforward of cri: error forwarding port %d to pod %s: %v", port, h.pod, err)
p.printError(msg)
}
}
Expand Down

0 comments on commit a115eb8

Please sign in to comment.