From 1e1f3f23be1340471d5d4345697d988e4c284407 Mon Sep 17 00:00:00 2001 From: YaoZengzeng Date: Mon, 30 Jul 2018 16:33:41 +0800 Subject: [PATCH] bugfix: data race in pkg cri/stream Signed-off-by: YaoZengzeng --- cri/stream/portforward/httpstream.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/cri/stream/portforward/httpstream.go b/cri/stream/portforward/httpstream.go index 11aa390c9..e6142df52 100644 --- a/cri/stream/portforward/httpstream.go +++ b/cri/stream/portforward/httpstream.go @@ -58,7 +58,7 @@ func handleHTTPStreams(ctx context.Context, w http.ResponseWriter, req *http.Req } streamChan := make(chan httpstream.Stream, 1) - logrus.Infof("Upgrading port forward response") + logrus.Infof("upgrading port forward response") upgrader := spdy.NewResponseUpgrader() conn := upgrader.UpgradeResponse(w, req, httpStreamReceived(streamChan)) if conn == nil { @@ -66,7 +66,7 @@ func handleHTTPStreams(ctx context.Context, w http.ResponseWriter, req *http.Req } defer conn.Close() - logrus.Infof("(conn=%p) setting forwarding streaming connection idle timeout to %v", conn, idleTimeout) + logrus.Infof("setting forwarding streaming connection idle timeout to %v", idleTimeout) conn.SetIdleTimeout(idleTimeout) h := &httpStreamHandler{ @@ -99,11 +99,11 @@ type httpStreamHandler struct { func (h *httpStreamHandler) getStreamPair(requestID string) (*httpStreamPair, bool) { p, ok := h.streamPairs.Get(requestID).Result() if ok { - logrus.Infof("(conn=%p, request=%s) found existing stream pair", h.conn, requestID) + logrus.Infof("portforward of cri: found existing stream pair for request %s", requestID) return p.(*httpStreamPair), false } - logrus.Infof("(conn=%p, request=%s) creating new stream pair", h.conn, requestID) + logrus.Infof("portforward of cri: creating new stream pair for request %s", requestID) pair := newPortForwardPair(requestID) h.streamPairs.Put(requestID, pair) @@ -129,10 +129,10 @@ func (h *httpStreamHandler) removeStreamPair(requestID string) { func (h *httpStreamHandler) monitorStreamPair(p *httpStreamPair, timeout <-chan time.Time) { select { case <-timeout: - msg := fmt.Sprintf("(conn=%v, request=%s) timed out waiting for streams", h.conn, p.requestID) + msg := fmt.Sprintf("portforward of cri: timed out waiting for streams of request %s", p.requestID) p.printError(msg) case <-p.complete: - logrus.Infof("(conn=%v, request=%s) successfully received error and data streams", h.conn, p.requestID) + logrus.Infof("portforward of cri: successfully received error and data streams of request %s", p.requestID) } h.removeStreamPair(p.requestID) } @@ -152,24 +152,24 @@ func (h *httpStreamHandler) requestID(stream httpstream.Stream) string { // invoking portForward for each complete stream pair. The loop exits // when the httpstream.Connection is closed. func (h *httpStreamHandler) run(ctx context.Context) { - logrus.Infof("(conn=%p) waiting for port forward streams", h.conn) + logrus.Infof("portforward of cri: waiting for streams") for { select { case <-h.conn.CloseChan(): - logrus.Infof("(conn=%p) upgraded connection closed", h.conn) + logrus.Infof("portforward of cri: upgraded connection closed") return case stream := <-h.streamChan: requestID := h.requestID(stream) streamType := stream.Headers().Get(constant.StreamType) - logrus.Infof("(conn=%p, request=%s) received new stream of type %s", h.conn, requestID, streamType) + logrus.Infof("portForward of cri: received new stream of type %s, request %s", streamType, requestID) p, created := h.getStreamPair(requestID) if created { go h.monitorStreamPair(p, time.After(h.streamCreationTimeout)) } if complete, err := p.add(stream); err != nil { - msg := fmt.Sprintf("error processing stream for request %s: %v", requestID, err) + msg := fmt.Sprintf("portforward of cri: error processing stream for request %s: %v", requestID, err) p.printError(msg) } else if complete { go h.portForward(ctx, p) @@ -187,12 +187,12 @@ func (h *httpStreamHandler) portForward(ctx context.Context, p *httpStreamPair) portString := p.dataStream.Headers().Get(constant.PortHeader) port, _ := strconv.ParseInt(portString, 10, 32) - logrus.Infof("(conn=%p, request=%s) invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) + logrus.Infof("portforward of cri: invoking forwarder.PortForward for port %s of request %s", portString, p.requestID) err := h.forwarder.PortForward(ctx, h.pod, int32(port), p.dataStream) - logrus.Infof("(conn=%p, request=%s) done invoking forwarder.PortForward for port %s", h.conn, p.requestID, portString) + logrus.Infof("portforward of cri: done invoking forwarder.PortForward for port %s", portString, p.requestID) if err != nil { - msg := fmt.Sprintf("error forwarding port %d to pod %s: %v", port, h.pod, err) + msg := fmt.Sprintf("portforward of cri: error forwarding port %d to pod %s: %v", port, h.pod, err) p.printError(msg) } }