diff --git a/go.sum b/go.sum index 285e35ac38..0168bcd378 100644 --- a/go.sum +++ b/go.sum @@ -139,8 +139,6 @@ github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGV github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= -github.com/pborman/uuid v0.0.0-20180906182336-adf5a7427709 h1:zNBQb37RGLmJybyMcs983HfUfpkw9OTFD9tbBfAViHE= -github.com/pborman/uuid v0.0.0-20180906182336-adf5a7427709/go.mod h1:VyrYX9gd7irzKovcSS6BIIEwPRkP2Wm2m9ufcdFSJ34= github.com/pborman/uuid v1.2.0 h1:J7Q5mO4ysT1dv8hyrUGHb9+ooztCXu1D8MY8DZYsu3g= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/doc.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/doc.go new file mode 100644 index 0000000000..5893df5bd2 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package httpstream adds multiplexed streaming support to HTTP requests and +// responses via connection upgrades. +package httpstream // import "k8s.io/apimachinery/pkg/util/httpstream" diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go new file mode 100644 index 0000000000..50d9a366f3 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go @@ -0,0 +1,149 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httpstream + +import ( + "fmt" + "io" + "net/http" + "strings" + "time" +) + +const ( + HeaderConnection = "Connection" + HeaderUpgrade = "Upgrade" + HeaderProtocolVersion = "X-Stream-Protocol-Version" + HeaderAcceptedProtocolVersions = "X-Accepted-Stream-Protocol-Versions" +) + +// NewStreamHandler defines a function that is called when a new Stream is +// received. If no error is returned, the Stream is accepted; otherwise, +// the stream is rejected. After the reply frame has been sent, replySent is closed. +type NewStreamHandler func(stream Stream, replySent <-chan struct{}) error + +// NoOpNewStreamHandler is a stream handler that accepts a new stream and +// performs no other logic. +func NoOpNewStreamHandler(stream Stream, replySent <-chan struct{}) error { return nil } + +// Dialer knows how to open a streaming connection to a server. +type Dialer interface { + + // Dial opens a streaming connection to a server using one of the protocols + // specified (in order of most preferred to least preferred). + Dial(protocols ...string) (Connection, string, error) +} + +// UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade +// HTTP requests to support multiplexed bidirectional streams. After RoundTrip() +// is invoked, if the upgrade is successful, clients may retrieve the upgraded +// connection by calling UpgradeRoundTripper.Connection(). +type UpgradeRoundTripper interface { + http.RoundTripper + // NewConnection validates the response and creates a new Connection. + NewConnection(resp *http.Response) (Connection, error) +} + +// ResponseUpgrader knows how to upgrade HTTP requests and responses to +// add streaming support to them. +type ResponseUpgrader interface { + // UpgradeResponse upgrades an HTTP response to one that supports multiplexed + // streams. newStreamHandler will be called asynchronously whenever the + // other end of the upgraded connection creates a new stream. + UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler NewStreamHandler) Connection +} + +// Connection represents an upgraded HTTP connection. +type Connection interface { + // CreateStream creates a new Stream with the supplied headers. + CreateStream(headers http.Header) (Stream, error) + // Close resets all streams and closes the connection. + Close() error + // CloseChan returns a channel that is closed when the underlying connection is closed. + CloseChan() <-chan bool + // SetIdleTimeout sets the amount of time the connection may remain idle before + // it is automatically closed. + SetIdleTimeout(timeout time.Duration) +} + +// Stream represents a bidirectional communications channel that is part of an +// upgraded connection. +type Stream interface { + io.ReadWriteCloser + // Reset closes both directions of the stream, indicating that neither client + // or server can use it any more. + Reset() error + // Headers returns the headers used to create the stream. + Headers() http.Header + // Identifier returns the stream's ID. + Identifier() uint32 +} + +// IsUpgradeRequest returns true if the given request is a connection upgrade request +func IsUpgradeRequest(req *http.Request) bool { + for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] { + if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) { + return true + } + } + return false +} + +func negotiateProtocol(clientProtocols, serverProtocols []string) string { + for i := range clientProtocols { + for j := range serverProtocols { + if clientProtocols[i] == serverProtocols[j] { + return clientProtocols[i] + } + } + } + return "" +} + +// Handshake performs a subprotocol negotiation. If the client did request a +// subprotocol, Handshake will select the first common value found in +// serverProtocols. If a match is found, Handshake adds a response header +// indicating the chosen subprotocol. If no match is found, HTTP forbidden is +// returned, along with a response header containing the list of protocols the +// server can accept. +func Handshake(req *http.Request, w http.ResponseWriter, serverProtocols []string) (string, error) { + clientProtocols := req.Header[http.CanonicalHeaderKey(HeaderProtocolVersion)] + if len(clientProtocols) == 0 { + // Kube 1.0 clients didn't support subprotocol negotiation. + // TODO require clientProtocols once Kube 1.0 is no longer supported + return "", nil + } + + if len(serverProtocols) == 0 { + // Kube 1.0 servers didn't support subprotocol negotiation. This is mainly for testing. + // TODO require serverProtocols once Kube 1.0 is no longer supported + return "", nil + } + + negotiatedProtocol := negotiateProtocol(clientProtocols, serverProtocols) + if len(negotiatedProtocol) == 0 { + for i := range serverProtocols { + w.Header().Add(HeaderAcceptedProtocolVersions, serverProtocols[i]) + } + err := fmt.Errorf("unable to upgrade: unable to negotiate protocol: client supports %v, server accepts %v", clientProtocols, serverProtocols) + http.Error(w, err.Error(), http.StatusForbidden) + return "", err + } + + w.Header().Add(HeaderProtocolVersion, negotiatedProtocol) + return negotiatedProtocol, nil +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream_test.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream_test.go new file mode 100644 index 0000000000..f7f9a3ebf4 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/httpstream_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httpstream + +import ( + "net/http" + "reflect" + "testing" +) + +type responseWriter struct { + header http.Header + statusCode *int +} + +func newResponseWriter() *responseWriter { + return &responseWriter{ + header: make(http.Header), + } +} + +func (r *responseWriter) Header() http.Header { + return r.header +} + +func (r *responseWriter) WriteHeader(code int) { + r.statusCode = &code +} + +func (r *responseWriter) Write([]byte) (int, error) { + return 0, nil +} + +func TestHandshake(t *testing.T) { + tests := map[string]struct { + clientProtocols []string + serverProtocols []string + expectedProtocol string + expectError bool + }{ + "no client protocols": { + clientProtocols: []string{}, + serverProtocols: []string{"a", "b"}, + expectedProtocol: "", + }, + "no common protocol": { + clientProtocols: []string{"c"}, + serverProtocols: []string{"a", "b"}, + expectedProtocol: "", + expectError: true, + }, + "common protocol": { + clientProtocols: []string{"b"}, + serverProtocols: []string{"a", "b"}, + expectedProtocol: "b", + }, + } + + for name, test := range tests { + req, err := http.NewRequest("GET", "http://www.example.com/", nil) + if err != nil { + t.Fatalf("%s: error creating request: %v", name, err) + } + + for _, p := range test.clientProtocols { + req.Header.Add(HeaderProtocolVersion, p) + } + + w := newResponseWriter() + negotiated, err := Handshake(req, w, test.serverProtocols) + + // verify negotiated protocol + if e, a := test.expectedProtocol, negotiated; e != a { + t.Errorf("%s: protocol: expected %q, got %q", name, e, a) + } + + if test.expectError { + if err == nil { + t.Errorf("%s: expected error but did not get one", name) + } + if w.statusCode == nil { + t.Errorf("%s: expected w.statusCode to be set", name) + } else if e, a := http.StatusForbidden, *w.statusCode; e != a { + t.Errorf("%s: w.statusCode: expected %d, got %d", name, e, a) + } + if e, a := test.serverProtocols, w.Header()[HeaderAcceptedProtocolVersions]; !reflect.DeepEqual(e, a) { + t.Errorf("%s: accepted server protocols: expected %v, got %v", name, e, a) + } + continue + } + if !test.expectError && err != nil { + t.Errorf("%s: unexpected error: %v", name, err) + continue + } + if w.statusCode != nil { + t.Errorf("%s: unexpected non-nil w.statusCode: %d", name, w.statusCode) + } + + if len(test.expectedProtocol) == 0 { + if len(w.Header()[HeaderProtocolVersion]) > 0 { + t.Errorf("%s: unexpected protocol version response header: %s", name, w.Header()[HeaderProtocolVersion]) + } + continue + } + + // verify response headers + if e, a := []string{test.expectedProtocol}, w.Header()[HeaderProtocolVersion]; !reflect.DeepEqual(e, a) { + t.Errorf("%s: protocol response header: expected %v, got %v", name, e, a) + } + } +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go new file mode 100644 index 0000000000..9d222faa89 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go @@ -0,0 +1,145 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "net" + "net/http" + "sync" + "time" + + "github.com/docker/spdystream" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/klog" +) + +// connection maintains state about a spdystream.Connection and its associated +// streams. +type connection struct { + conn *spdystream.Connection + streams []httpstream.Stream + streamLock sync.Mutex + newStreamHandler httpstream.NewStreamHandler +} + +// NewClientConnection creates a new SPDY client connection. +func NewClientConnection(conn net.Conn) (httpstream.Connection, error) { + spdyConn, err := spdystream.NewConnection(conn, false) + if err != nil { + defer conn.Close() + return nil, err + } + + return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil +} + +// NewServerConnection creates a new SPDY server connection. newStreamHandler +// will be invoked when the server receives a newly created stream from the +// client. +func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { + spdyConn, err := spdystream.NewConnection(conn, true) + if err != nil { + defer conn.Close() + return nil, err + } + + return newConnection(spdyConn, newStreamHandler), nil +} + +// newConnection returns a new connection wrapping conn. newStreamHandler +// will be invoked when the server receives a newly created stream from the +// client. +func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { + c := &connection{conn: conn, newStreamHandler: newStreamHandler} + go conn.Serve(c.newSpdyStream) + return c +} + +// createStreamResponseTimeout indicates how long to wait for the other side to +// acknowledge the new stream before timing out. +const createStreamResponseTimeout = 30 * time.Second + +// Close first sends a reset for all of the connection's streams, and then +// closes the underlying spdystream.Connection. +func (c *connection) Close() error { + c.streamLock.Lock() + for _, s := range c.streams { + // calling Reset instead of Close ensures that all streams are fully torn down + s.Reset() + } + c.streams = make([]httpstream.Stream, 0) + c.streamLock.Unlock() + + // now that all streams are fully torn down, it's safe to call close on the underlying connection, + // which should be able to terminate immediately at this point, instead of waiting for any + // remaining graceful stream termination. + return c.conn.Close() +} + +// CreateStream creates a new stream with the specified headers and registers +// it with the connection. +func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) { + stream, err := c.conn.CreateStream(headers, nil, false) + if err != nil { + return nil, err + } + if err = stream.WaitTimeout(createStreamResponseTimeout); err != nil { + return nil, err + } + + c.registerStream(stream) + return stream, nil +} + +// registerStream adds the stream s to the connection's list of streams that +// it owns. +func (c *connection) registerStream(s httpstream.Stream) { + c.streamLock.Lock() + c.streams = append(c.streams, s) + c.streamLock.Unlock() +} + +// CloseChan returns a channel that, when closed, indicates that the underlying +// spdystream.Connection has been closed. +func (c *connection) CloseChan() <-chan bool { + return c.conn.CloseChan() +} + +// newSpdyStream is the internal new stream handler used by spdystream.Connection.Serve. +// It calls connection's newStreamHandler, giving it the opportunity to accept or reject +// the stream. If newStreamHandler returns an error, the stream is rejected. If not, the +// stream is accepted and registered with the connection. +func (c *connection) newSpdyStream(stream *spdystream.Stream) { + replySent := make(chan struct{}) + err := c.newStreamHandler(stream, replySent) + rejectStream := (err != nil) + if rejectStream { + klog.Warningf("Stream rejected: %v", err) + stream.Reset() + return + } + + c.registerStream(stream) + stream.SendReply(http.Header{}, rejectStream) + close(replySent) +} + +// SetIdleTimeout sets the amount of time the connection may remain idle before +// it is automatically closed. +func (c *connection) SetIdleTimeout(timeout time.Duration) { + c.conn.SetIdleTimeout(timeout) +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go new file mode 100644 index 0000000000..f94e2b3ba4 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "io" + "net" + "net/http" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/httpstream" +) + +func runProxy(t *testing.T, backendUrl string, proxyUrl chan<- string, proxyDone chan<- struct{}) { + listener, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + defer listener.Close() + + proxyUrl <- listener.Addr().String() + + clientConn, err := listener.Accept() + if err != nil { + t.Errorf("proxy: error accepting client connection: %v", err) + return + } + + backendConn, err := net.Dial("tcp4", backendUrl) + if err != nil { + t.Errorf("proxy: error dialing backend: %v", err) + return + } + defer backendConn.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + io.Copy(backendConn, clientConn) + }() + + go func() { + defer wg.Done() + io.Copy(clientConn, backendConn) + }() + + wg.Wait() + + proxyDone <- struct{}{} +} + +func runServer(t *testing.T, backendUrl chan<- string, serverDone chan<- struct{}) { + listener, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatalf("server: error listening: %v", err) + } + defer listener.Close() + + backendUrl <- listener.Addr().String() + + conn, err := listener.Accept() + if err != nil { + t.Errorf("server: error accepting connection: %v", err) + return + } + + streamChan := make(chan httpstream.Stream) + replySentChan := make(chan (<-chan struct{})) + spdyConn, err := NewServerConnection(conn, func(stream httpstream.Stream, replySent <-chan struct{}) error { + streamChan <- stream + replySentChan <- replySent + return nil + }) + if err != nil { + t.Errorf("server: error creating spdy connection: %v", err) + return + } + + stream := <-streamChan + replySent := <-replySentChan + <-replySent + + buf := make([]byte, 1) + _, err = stream.Read(buf) + if err != io.EOF { + t.Errorf("server: unexpected read error: %v", err) + return + } + + <-spdyConn.CloseChan() + raw := spdyConn.(*connection).conn + if err := raw.Wait(15 * time.Second); err != nil { + t.Errorf("server: timed out waiting for connection closure: %v", err) + } + + serverDone <- struct{}{} +} + +func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) { + serverDone := make(chan struct{}) + backendUrlChan := make(chan string) + go runServer(t, backendUrlChan, serverDone) + backendUrl := <-backendUrlChan + + proxyDone := make(chan struct{}) + proxyUrlChan := make(chan string) + go runProxy(t, backendUrl, proxyUrlChan, proxyDone) + proxyUrl := <-proxyUrlChan + + conn, err := net.Dial("tcp4", proxyUrl) + if err != nil { + t.Fatalf("client: error connecting to proxy: %v", err) + } + + spdyConn, err := NewClientConnection(conn) + if err != nil { + t.Fatalf("client: error creating spdy connection: %v", err) + } + + if _, err := spdyConn.CreateStream(http.Header{}); err != nil { + t.Fatalf("client: error creating stream: %v", err) + } + + spdyConn.Close() + raw := spdyConn.(*connection).conn + if err := raw.Wait(15 * time.Second); err != nil { + t.Fatalf("client: timed out waiting for connection closure: %v", err) + } + + expired := time.NewTimer(15 * time.Second) + defer expired.Stop() + i := 0 + for { + select { + case <-expired.C: + t.Fatalf("timed out waiting for proxy and/or server closure") + case <-serverDone: + i++ + case <-proxyDone: + i++ + } + if i == 2 { + break + } + } +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go new file mode 100644 index 0000000000..2699597e7a --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go @@ -0,0 +1,335 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/http/httputil" + "net/url" + "strings" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/httpstream" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/third_party/forked/golang/netutil" +) + +// SpdyRoundTripper knows how to upgrade an HTTP request to one that supports +// multiplexed streams. After RoundTrip() is invoked, Conn will be set +// and usable. SpdyRoundTripper implements the UpgradeRoundTripper interface. +type SpdyRoundTripper struct { + //tlsConfig holds the TLS configuration settings to use when connecting + //to the remote server. + tlsConfig *tls.Config + + /* TODO according to http://golang.org/pkg/net/http/#RoundTripper, a RoundTripper + must be safe for use by multiple concurrent goroutines. If this is absolutely + necessary, we could keep a map from http.Request to net.Conn. In practice, + a client will create an http.Client, set the transport to a new insteace of + SpdyRoundTripper, and use it a single time, so this hopefully won't be an issue. + */ + // conn is the underlying network connection to the remote server. + conn net.Conn + + // Dialer is the dialer used to connect. Used if non-nil. + Dialer *net.Dialer + + // proxier knows which proxy to use given a request, defaults to http.ProxyFromEnvironment + // Used primarily for mocking the proxy discovery in tests. + proxier func(req *http.Request) (*url.URL, error) + + // followRedirects indicates if the round tripper should examine responses for redirects and + // follow them. + followRedirects bool + // requireSameHostRedirects restricts redirect following to only follow redirects to the same host + // as the original request. + requireSameHostRedirects bool +} + +var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{} +var _ httpstream.UpgradeRoundTripper = &SpdyRoundTripper{} +var _ utilnet.Dialer = &SpdyRoundTripper{} + +// NewRoundTripper creates a new SpdyRoundTripper that will use +// the specified tlsConfig. +func NewRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) httpstream.UpgradeRoundTripper { + return NewSpdyRoundTripper(tlsConfig, followRedirects, requireSameHostRedirects) +} + +// NewSpdyRoundTripper creates a new SpdyRoundTripper that will use +// the specified tlsConfig. This function is mostly meant for unit tests. +func NewSpdyRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) *SpdyRoundTripper { + return &SpdyRoundTripper{ + tlsConfig: tlsConfig, + followRedirects: followRedirects, + requireSameHostRedirects: requireSameHostRedirects, + } +} + +// TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during +// proxying with a spdy roundtripper. +func (s *SpdyRoundTripper) TLSClientConfig() *tls.Config { + return s.tlsConfig +} + +// Dial implements k8s.io/apimachinery/pkg/util/net.Dialer. +func (s *SpdyRoundTripper) Dial(req *http.Request) (net.Conn, error) { + conn, err := s.dial(req) + if err != nil { + return nil, err + } + + if err := req.Write(conn); err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} + +// dial dials the host specified by req, using TLS if appropriate, optionally +// using a proxy server if one is configured via environment variables. +func (s *SpdyRoundTripper) dial(req *http.Request) (net.Conn, error) { + proxier := s.proxier + if proxier == nil { + proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment) + } + proxyURL, err := proxier(req) + if err != nil { + return nil, err + } + + if proxyURL == nil { + return s.dialWithoutProxy(req.Context(), req.URL) + } + + // ensure we use a canonical host with proxyReq + targetHost := netutil.CanonicalAddr(req.URL) + + // proxying logic adapted from http://blog.h6t.eu/post/74098062923/golang-websocket-with-http-proxy-support + proxyReq := http.Request{ + Method: "CONNECT", + URL: &url.URL{}, + Host: targetHost, + } + + if pa := s.proxyAuth(proxyURL); pa != "" { + proxyReq.Header = http.Header{} + proxyReq.Header.Set("Proxy-Authorization", pa) + } + + proxyDialConn, err := s.dialWithoutProxy(req.Context(), proxyURL) + if err != nil { + return nil, err + } + + proxyClientConn := httputil.NewProxyClientConn(proxyDialConn, nil) + _, err = proxyClientConn.Do(&proxyReq) + if err != nil && err != httputil.ErrPersistEOF { + return nil, err + } + + rwc, _ := proxyClientConn.Hijack() + + if req.URL.Scheme != "https" { + return rwc, nil + } + + host, _, err := net.SplitHostPort(targetHost) + if err != nil { + return nil, err + } + + tlsConfig := s.tlsConfig + switch { + case tlsConfig == nil: + tlsConfig = &tls.Config{ServerName: host} + case len(tlsConfig.ServerName) == 0: + tlsConfig = tlsConfig.Clone() + tlsConfig.ServerName = host + } + + tlsConn := tls.Client(rwc, tlsConfig) + + // need to manually call Handshake() so we can call VerifyHostname() below + if err := tlsConn.Handshake(); err != nil { + return nil, err + } + + // Return if we were configured to skip validation + if tlsConfig.InsecureSkipVerify { + return tlsConn, nil + } + + if err := tlsConn.VerifyHostname(tlsConfig.ServerName); err != nil { + return nil, err + } + + return tlsConn, nil +} + +// dialWithoutProxy dials the host specified by url, using TLS if appropriate. +func (s *SpdyRoundTripper) dialWithoutProxy(ctx context.Context, url *url.URL) (net.Conn, error) { + dialAddr := netutil.CanonicalAddr(url) + + if url.Scheme == "http" { + if s.Dialer == nil { + var d net.Dialer + return d.DialContext(ctx, "tcp", dialAddr) + } else { + return s.Dialer.DialContext(ctx, "tcp", dialAddr) + } + } + + // TODO validate the TLSClientConfig is set up? + var conn *tls.Conn + var err error + if s.Dialer == nil { + conn, err = tls.Dial("tcp", dialAddr, s.tlsConfig) + } else { + conn, err = tls.DialWithDialer(s.Dialer, "tcp", dialAddr, s.tlsConfig) + } + if err != nil { + return nil, err + } + + // Return if we were configured to skip validation + if s.tlsConfig != nil && s.tlsConfig.InsecureSkipVerify { + return conn, nil + } + + host, _, err := net.SplitHostPort(dialAddr) + if err != nil { + return nil, err + } + if s.tlsConfig != nil && len(s.tlsConfig.ServerName) > 0 { + host = s.tlsConfig.ServerName + } + err = conn.VerifyHostname(host) + if err != nil { + return nil, err + } + + return conn, nil +} + +// proxyAuth returns, for a given proxy URL, the value to be used for the Proxy-Authorization header +func (s *SpdyRoundTripper) proxyAuth(proxyURL *url.URL) string { + if proxyURL == nil || proxyURL.User == nil { + return "" + } + credentials := proxyURL.User.String() + encodedAuth := base64.StdEncoding.EncodeToString([]byte(credentials)) + return fmt.Sprintf("Basic %s", encodedAuth) +} + +// RoundTrip executes the Request and upgrades it. After a successful upgrade, +// clients may call SpdyRoundTripper.Connection() to retrieve the upgraded +// connection. +func (s *SpdyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + header := utilnet.CloneHeader(req.Header) + header.Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade) + header.Add(httpstream.HeaderUpgrade, HeaderSpdy31) + + var ( + conn net.Conn + rawResponse []byte + err error + ) + + if s.followRedirects { + conn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, req.URL, header, req.Body, s, s.requireSameHostRedirects) + } else { + clone := utilnet.CloneRequest(req) + clone.Header = header + conn, err = s.Dial(clone) + } + if err != nil { + return nil, err + } + + responseReader := bufio.NewReader( + io.MultiReader( + bytes.NewBuffer(rawResponse), + conn, + ), + ) + + resp, err := http.ReadResponse(responseReader, nil) + if err != nil { + if conn != nil { + conn.Close() + } + return nil, err + } + + s.conn = conn + + return resp, nil +} + +// NewConnection validates the upgrade response, creating and returning a new +// httpstream.Connection if there were no errors. +func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connection, error) { + connectionHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderConnection)) + upgradeHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderUpgrade)) + if (resp.StatusCode != http.StatusSwitchingProtocols) || !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) { + defer resp.Body.Close() + responseError := "" + responseErrorBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + responseError = "unable to read error from server response" + } else { + // TODO: I don't belong here, I should be abstracted from this class + if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil { + if status, ok := obj.(*metav1.Status); ok { + return nil, &apierrors.StatusError{ErrStatus: *status} + } + } + responseError = string(responseErrorBytes) + responseError = strings.TrimSpace(responseError) + } + + return nil, fmt.Errorf("unable to upgrade connection: %s", responseError) + } + + return NewClientConnection(s.conn) +} + +// statusScheme is private scheme for the decoding here until someone fixes the TODO in NewConnection +var statusScheme = runtime.NewScheme() + +// ParameterCodec knows about query parameters used with the meta v1 API spec. +var statusCodecs = serializer.NewCodecFactory(statusScheme) + +func init() { + statusScheme.AddUnversionedTypes(metav1.SchemeGroupVersion, + &metav1.Status{}, + ) +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper_test.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper_test.go new file mode 100644 index 0000000000..418b13f876 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper_test.go @@ -0,0 +1,529 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "crypto/tls" + "crypto/x509" + "encoding/base64" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync/atomic" + "testing" + + "github.com/elazarl/goproxy" + + "k8s.io/apimachinery/pkg/util/httpstream" +) + +// be sure to unset environment variable https_proxy (if exported) before testing, otherwise the testing will fail unexpectedly. +func TestRoundTripAndNewConnection(t *testing.T) { + for _, redirect := range []bool{false, true} { + t.Run(fmt.Sprintf("redirect = %t", redirect), func(t *testing.T) { + localhostPool := x509.NewCertPool() + if !localhostPool.AppendCertsFromPEM(localhostCert) { + t.Errorf("error setting up localhostCert pool") + } + + httpsServerInvalidHostname := func(h http.Handler) *httptest.Server { + cert, err := tls.X509KeyPair(exampleCert, exampleKey) + if err != nil { + t.Errorf("https (invalid hostname): proxy_test: %v", err) + } + ts := httptest.NewUnstartedServer(h) + ts.TLS = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + ts.StartTLS() + return ts + } + + httpsServerValidHostname := func(h http.Handler) *httptest.Server { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Errorf("https (valid hostname): proxy_test: %v", err) + } + ts := httptest.NewUnstartedServer(h) + ts.TLS = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + ts.StartTLS() + return ts + } + + testCases := map[string]struct { + serverFunc func(http.Handler) *httptest.Server + proxyServerFunc func(http.Handler) *httptest.Server + proxyAuth *url.Userinfo + clientTLS *tls.Config + serverConnectionHeader string + serverUpgradeHeader string + serverStatusCode int + shouldError bool + }{ + "no headers": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "", + serverUpgradeHeader: "", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "no upgrade header": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "no connection header": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "no switching protocol status code": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusForbidden, + shouldError: true, + }, + "http": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "https (invalid hostname + InsecureSkipVerify)": { + serverFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "https (invalid hostname + hostname verification)": { + serverFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: false}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "https (valid hostname + RootCAs)": { + serverFunc: httpsServerValidHostname, + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied http->http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https (invalid hostname + InsecureSkipVerify) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https with auth (invalid hostname + InsecureSkipVerify) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerInvalidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https (invalid hostname + hostname verification) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: false}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, // fails because the client doesn't trust the proxy + }, + "proxied https (valid hostname + RootCAs) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerValidHostname, + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https with auth (valid hostname + RootCAs) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerValidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https (invalid hostname + InsecureSkipVerify) -> https (invalid hostname)": { + serverFunc: httpsServerInvalidHostname, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, // works because the test proxy ignores TLS errors + }, + "proxied https with auth (invalid hostname + InsecureSkipVerify) -> https (invalid hostname)": { + serverFunc: httpsServerInvalidHostname, + proxyServerFunc: httpsServerInvalidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, // works because the test proxy ignores TLS errors + }, + "proxied https (invalid hostname + hostname verification) -> https (invalid hostname)": { + serverFunc: httpsServerInvalidHostname, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: false}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, // fails because the client doesn't trust the proxy + }, + "proxied https (valid hostname + RootCAs) -> https (valid hostname + RootCAs)": { + serverFunc: httpsServerValidHostname, + proxyServerFunc: httpsServerValidHostname, + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https with auth (valid hostname + RootCAs) -> https (valid hostname + RootCAs)": { + serverFunc: httpsServerValidHostname, + proxyServerFunc: httpsServerValidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + } + + for k, testCase := range testCases { + server := testCase.serverFunc(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if testCase.shouldError { + if e, a := httpstream.HeaderUpgrade, req.Header.Get(httpstream.HeaderConnection); e != a { + t.Fatalf("%s: Expected connection=upgrade header, got '%s", k, a) + } + + w.Header().Set(httpstream.HeaderConnection, testCase.serverConnectionHeader) + w.Header().Set(httpstream.HeaderUpgrade, testCase.serverUpgradeHeader) + w.WriteHeader(testCase.serverStatusCode) + + return + } + + streamCh := make(chan httpstream.Stream) + + responseUpgrader := NewResponseUpgrader() + spdyConn := responseUpgrader.UpgradeResponse(w, req, func(s httpstream.Stream, replySent <-chan struct{}) error { + streamCh <- s + return nil + }) + if spdyConn == nil { + t.Fatalf("%s: unexpected nil spdyConn", k) + } + defer spdyConn.Close() + + stream := <-streamCh + io.Copy(stream, stream) + })) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("%s: Error creating request: %s", k, err) + } + req, err := http.NewRequest("GET", server.URL, nil) + if err != nil { + t.Fatalf("%s: Error creating request: %s", k, err) + } + + spdyTransport := NewSpdyRoundTripper(testCase.clientTLS, redirect, redirect) + + var proxierCalled bool + var proxyCalledWithHost string + var proxyCalledWithAuth bool + var proxyCalledWithAuthHeader string + if testCase.proxyServerFunc != nil { + proxyHandler := goproxy.NewProxyHttpServer() + + proxyHandler.OnRequest().HandleConnectFunc(func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) { + proxyCalledWithHost = host + + proxyAuthHeaderName := "Proxy-Authorization" + _, proxyCalledWithAuth = ctx.Req.Header[proxyAuthHeaderName] + proxyCalledWithAuthHeader = ctx.Req.Header.Get(proxyAuthHeaderName) + return goproxy.OkConnect, host + }) + + proxy := testCase.proxyServerFunc(proxyHandler) + + spdyTransport.proxier = func(proxierReq *http.Request) (*url.URL, error) { + proxierCalled = true + proxyURL, err := url.Parse(proxy.URL) + if err != nil { + return nil, err + } + proxyURL.User = testCase.proxyAuth + return proxyURL, nil + } + defer proxy.Close() + } + + client := &http.Client{Transport: spdyTransport} + + resp, err := client.Do(req) + var conn httpstream.Connection + if err == nil { + conn, err = spdyTransport.NewConnection(resp) + } + haveErr := err != nil + if e, a := testCase.shouldError, haveErr; e != a { + t.Fatalf("%s: shouldError=%t, got %t: %v", k, e, a, err) + } + if testCase.shouldError { + continue + } + defer conn.Close() + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Fatalf("%s: expected http 101 switching protocols, got %d", k, resp.StatusCode) + } + + stream, err := conn.CreateStream(http.Header{}) + if err != nil { + t.Fatalf("%s: error creating client stream: %s", k, err) + } + + n, err := stream.Write([]byte("hello")) + if err != nil { + t.Fatalf("%s: error writing to stream: %s", k, err) + } + if n != 5 { + t.Fatalf("%s: Expected to write 5 bytes, but actually wrote %d", k, n) + } + + b := make([]byte, 5) + n, err = stream.Read(b) + if err != nil { + t.Fatalf("%s: error reading from stream: %s", k, err) + } + if n != 5 { + t.Fatalf("%s: Expected to read 5 bytes, but actually read %d", k, n) + } + if e, a := "hello", string(b[0:n]); e != a { + t.Fatalf("%s: expected '%s', got '%s'", k, e, a) + } + + if testCase.proxyServerFunc != nil { + if !proxierCalled { + t.Fatalf("%s: Expected to use a proxy but proxier in SpdyRoundTripper wasn't called", k) + } + if proxyCalledWithHost != serverURL.Host { + t.Fatalf("%s: Expected to see a call to the proxy for backend %q, got %q", k, serverURL.Host, proxyCalledWithHost) + } + } + + var expectedProxyAuth string + if testCase.proxyAuth != nil { + encodedCredentials := base64.StdEncoding.EncodeToString([]byte(testCase.proxyAuth.String())) + expectedProxyAuth = "Basic " + encodedCredentials + } + if len(expectedProxyAuth) == 0 && proxyCalledWithAuth { + t.Fatalf("%s: Proxy authorization unexpected, got %q", k, proxyCalledWithAuthHeader) + } + if proxyCalledWithAuthHeader != expectedProxyAuth { + t.Fatalf("%s: Expected to see a call to the proxy with credentials %q, got %q", k, testCase.proxyAuth, proxyCalledWithAuthHeader) + } + } + }) + } +} + +func TestRoundTripRedirects(t *testing.T) { + tests := []struct { + redirects int32 + expectSuccess bool + }{ + {0, true}, + {1, true}, + {9, true}, + {10, false}, + } + for _, test := range tests { + t.Run(fmt.Sprintf("with %d redirects", test.redirects), func(t *testing.T) { + var redirects int32 = 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if redirects < test.redirects { + atomic.AddInt32(&redirects, 1) + http.Redirect(w, req, "redirect", http.StatusFound) + return + } + streamCh := make(chan httpstream.Stream) + + responseUpgrader := NewResponseUpgrader() + spdyConn := responseUpgrader.UpgradeResponse(w, req, func(s httpstream.Stream, replySent <-chan struct{}) error { + streamCh <- s + return nil + }) + if spdyConn == nil { + t.Fatalf("unexpected nil spdyConn") + } + defer spdyConn.Close() + + stream := <-streamCh + io.Copy(stream, stream) + })) + defer server.Close() + + req, err := http.NewRequest("GET", server.URL, nil) + if err != nil { + t.Fatalf("Error creating request: %s", err) + } + + spdyTransport := NewSpdyRoundTripper(nil, true, true) + client := &http.Client{Transport: spdyTransport} + + resp, err := client.Do(req) + if test.expectSuccess { + if err != nil { + t.Fatalf("error calling Do: %v", err) + } + } else { + if err == nil { + t.Fatalf("expecting an error") + } else if !strings.Contains(err.Error(), "too many redirects") { + t.Fatalf("expecting too many redirects, got %v", err) + } + return + } + + conn, err := spdyTransport.NewConnection(resp) + if err != nil { + t.Fatalf("error calling NewConnection: %v", err) + } + defer conn.Close() + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Fatalf("expected http 101 switching protocols, got %d", resp.StatusCode) + } + + stream, err := conn.CreateStream(http.Header{}) + if err != nil { + t.Fatalf("error creating client stream: %s", err) + } + + n, err := stream.Write([]byte("hello")) + if err != nil { + t.Fatalf("error writing to stream: %s", err) + } + if n != 5 { + t.Fatalf("Expected to write 5 bytes, but actually wrote %d", n) + } + + b := make([]byte, 5) + n, err = stream.Read(b) + if err != nil { + t.Fatalf("error reading from stream: %s", err) + } + if n != 5 { + t.Fatalf("Expected to read 5 bytes, but actually read %d", n) + } + if e, a := "hello", string(b[0:n]); e != a { + t.Fatalf("expected '%s', got '%s'", e, a) + } + }) + } +} + +// exampleCert was generated from crypto/tls/generate_cert.go with the following command: +// go run generate_cert.go --rsa-bits 512 --host example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var exampleCert = []byte(`-----BEGIN CERTIFICATE----- +MIIBdzCCASGgAwIBAgIRAOVTAdPnfbS5V85mfS90TfIwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC +QQCoVSqeu8TBvF+70T7Jm4340YQNhds6IxjRoifenYodAO1dnKGrcbF266DJGunh +nIjQH7B12tduhl0fLK4Ezf7/AgMBAAGjUDBOMA4GA1UdDwEB/wQEAwICpDATBgNV +HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MBYGA1UdEQQPMA2CC2V4 +YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA0EAk1kVa5uZ/AzwYDVcS9bpM/czwjjV +xq3VeSCfmNa2uNjbFvodmCRwZOHUvipAMGCUCV6j5vMrJ8eMj8tCQ36W9A== +-----END CERTIFICATE-----`) + +var exampleKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIBOgIBAAJBAKhVKp67xMG8X7vRPsmbjfjRhA2F2zojGNGiJ96dih0A7V2coatx +sXbroMka6eGciNAfsHXa126GXR8srgTN/v8CAwEAAQJASdzdD7vKsUwMIejGCUb1 +fAnLTPfAY3lFCa+CmR89nE22dAoRDv+5RbnBsZ58BazPNJHrsVPRlfXB3OQmSQr0 +SQIhANoJhs+xOJE/i8nJv0uAbzKyiD1YkvRkta0GpUOULyAVAiEAxaQus3E/SuqD +P7y5NeJnE7X6XkyC35zrsJRkz7orE8MCIHdDjsI8pjyNDeGqwUCDWE/a6DrmIDwe +emHSqMN2YvChAiEAnxLCM9NWaenOsaIoP+J1rDuvw+4499nJKVqGuVrSCRkCIEqK +4KSchPMc3x8M/uhw9oWTtKFmjA/PPh0FsWCdKrEy +-----END RSA PRIVATE KEY-----`) + +// localhostCert was generated from crypto/tls/generate_cert.go with the following command: +// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var localhostCert = []byte(`-----BEGIN CERTIFICATE----- +MIIBjzCCATmgAwIBAgIRAKpi2WmTcFrVjxrl5n5YDUEwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC +QQC9fEbRszP3t14Gr4oahV7zFObBI4TfA5i7YnlMXeLinb7MnvT4bkfOJzE6zktn +59zP7UiHs3l4YOuqrjiwM413AgMBAAGjaDBmMA4GA1UdDwEB/wQEAwICpDATBgNV +HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MC4GA1UdEQQnMCWCC2V4 +YW1wbGUuY29thwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUA +A0EAUsVE6KMnza/ZbodLlyeMzdo7EM/5nb5ywyOxgIOCf0OOLHsPS9ueGLQX9HEG +//yjTXuhNcUugExIjM/AIwAZPQ== +-----END CERTIFICATE-----`) + +// localhostKey is the private key for localhostCert. +var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIBOwIBAAJBAL18RtGzM/e3XgavihqFXvMU5sEjhN8DmLtieUxd4uKdvsye9Phu +R84nMTrOS2fn3M/tSIezeXhg66quOLAzjXcCAwEAAQJBAKcRxH9wuglYLBdI/0OT +BLzfWPZCEw1vZmMR2FF1Fm8nkNOVDPleeVGTWoOEcYYlQbpTmkGSxJ6ya+hqRi6x +goECIQDx3+X49fwpL6B5qpJIJMyZBSCuMhH4B7JevhGGFENi3wIhAMiNJN5Q3UkL +IuSvv03kaPR5XVQ99/UeEetUgGvBcABpAiBJSBzVITIVCGkGc7d+RCf49KTCIklv +bGWObufAR8Ni4QIgWpILjW8dkGg8GOUZ0zaNA6Nvt6TIv2UWGJ4v5PoV98kCIQDx +rIiZs5QbKdycsv9gQJzwQAogC8o04X3Zz3dsoX+h4A== +-----END RSA PRIVATE KEY-----`) diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go new file mode 100644 index 0000000000..045d214d2b --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go @@ -0,0 +1,107 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "bufio" + "fmt" + "io" + "net" + "net/http" + "strings" + "sync/atomic" + + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/apimachinery/pkg/util/runtime" +) + +const HeaderSpdy31 = "SPDY/3.1" + +// responseUpgrader knows how to upgrade HTTP responses. It +// implements the httpstream.ResponseUpgrader interface. +type responseUpgrader struct { +} + +// connWrapper is used to wrap a hijacked connection and its bufio.Reader. All +// calls will be handled directly by the underlying net.Conn with the exception +// of Read and Close calls, which will consider data in the bufio.Reader. This +// ensures that data already inside the used bufio.Reader instance is also +// read. +type connWrapper struct { + net.Conn + closed int32 + bufReader *bufio.Reader +} + +func (w *connWrapper) Read(b []byte) (n int, err error) { + if atomic.LoadInt32(&w.closed) == 1 { + return 0, io.EOF + } + return w.bufReader.Read(b) +} + +func (w *connWrapper) Close() error { + err := w.Conn.Close() + atomic.StoreInt32(&w.closed, 1) + return err +} + +// NewResponseUpgrader returns a new httpstream.ResponseUpgrader that is +// capable of upgrading HTTP responses using SPDY/3.1 via the +// spdystream package. +func NewResponseUpgrader() httpstream.ResponseUpgrader { + return responseUpgrader{} +} + +// UpgradeResponse upgrades an HTTP response to one that supports multiplexed +// streams. newStreamHandler will be called synchronously whenever the +// other end of the upgraded connection creates a new stream. +func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { + connectionHeader := strings.ToLower(req.Header.Get(httpstream.HeaderConnection)) + upgradeHeader := strings.ToLower(req.Header.Get(httpstream.HeaderUpgrade)) + if !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) { + errorMsg := fmt.Sprintf("unable to upgrade: missing upgrade headers in request: %#v", req.Header) + http.Error(w, errorMsg, http.StatusBadRequest) + return nil + } + + hijacker, ok := w.(http.Hijacker) + if !ok { + errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response") + http.Error(w, errorMsg, http.StatusInternalServerError) + return nil + } + + w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade) + w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31) + w.WriteHeader(http.StatusSwitchingProtocols) + + conn, bufrw, err := hijacker.Hijack() + if err != nil { + runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err)) + return nil + } + + connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader} + spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler) + if err != nil { + runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) + return nil + } + + return spdyConn +} diff --git a/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade_test.go b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade_test.go new file mode 100644 index 0000000000..5a514dd5bf --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestUpgradeResponse(t *testing.T) { + testCases := []struct { + connectionHeader string + upgradeHeader string + shouldError bool + }{ + { + connectionHeader: "", + upgradeHeader: "", + shouldError: true, + }, + { + connectionHeader: "Upgrade", + upgradeHeader: "", + shouldError: true, + }, + { + connectionHeader: "", + upgradeHeader: "SPDY/3.1", + shouldError: true, + }, + { + connectionHeader: "Upgrade", + upgradeHeader: "SPDY/3.1", + shouldError: false, + }, + } + + for i, testCase := range testCases { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + upgrader := NewResponseUpgrader() + conn := upgrader.UpgradeResponse(w, req, nil) + haveErr := conn == nil + if e, a := testCase.shouldError, haveErr; e != a { + t.Fatalf("%d: expected shouldErr=%t, got %t", i, testCase.shouldError, haveErr) + } + if haveErr { + return + } + if conn == nil { + t.Fatalf("%d: unexpected nil conn", i) + } + defer conn.Close() + })) + defer server.Close() + + req, err := http.NewRequest("GET", server.URL, nil) + if err != nil { + t.Fatalf("%d: error creating request: %s", i, err) + } + + req.Header.Set("Connection", testCase.connectionHeader) + req.Header.Set("Upgrade", testCase.upgradeHeader) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Fatalf("%d: unexpected non-nil err from client.Do: %s", i, err) + } + + if testCase.shouldError { + continue + } + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Fatalf("%d: expected status 101 switching protocols, got %d", i, resp.StatusCode) + } + } +} diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/doc.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/doc.go new file mode 100644 index 0000000000..5893df5bd2 --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package httpstream adds multiplexed streaming support to HTTP requests and +// responses via connection upgrades. +package httpstream // import "k8s.io/apimachinery/pkg/util/httpstream" diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go new file mode 100644 index 0000000000..50d9a366f3 --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/httpstream.go @@ -0,0 +1,149 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httpstream + +import ( + "fmt" + "io" + "net/http" + "strings" + "time" +) + +const ( + HeaderConnection = "Connection" + HeaderUpgrade = "Upgrade" + HeaderProtocolVersion = "X-Stream-Protocol-Version" + HeaderAcceptedProtocolVersions = "X-Accepted-Stream-Protocol-Versions" +) + +// NewStreamHandler defines a function that is called when a new Stream is +// received. If no error is returned, the Stream is accepted; otherwise, +// the stream is rejected. After the reply frame has been sent, replySent is closed. +type NewStreamHandler func(stream Stream, replySent <-chan struct{}) error + +// NoOpNewStreamHandler is a stream handler that accepts a new stream and +// performs no other logic. +func NoOpNewStreamHandler(stream Stream, replySent <-chan struct{}) error { return nil } + +// Dialer knows how to open a streaming connection to a server. +type Dialer interface { + + // Dial opens a streaming connection to a server using one of the protocols + // specified (in order of most preferred to least preferred). + Dial(protocols ...string) (Connection, string, error) +} + +// UpgradeRoundTripper is a type of http.RoundTripper that is able to upgrade +// HTTP requests to support multiplexed bidirectional streams. After RoundTrip() +// is invoked, if the upgrade is successful, clients may retrieve the upgraded +// connection by calling UpgradeRoundTripper.Connection(). +type UpgradeRoundTripper interface { + http.RoundTripper + // NewConnection validates the response and creates a new Connection. + NewConnection(resp *http.Response) (Connection, error) +} + +// ResponseUpgrader knows how to upgrade HTTP requests and responses to +// add streaming support to them. +type ResponseUpgrader interface { + // UpgradeResponse upgrades an HTTP response to one that supports multiplexed + // streams. newStreamHandler will be called asynchronously whenever the + // other end of the upgraded connection creates a new stream. + UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler NewStreamHandler) Connection +} + +// Connection represents an upgraded HTTP connection. +type Connection interface { + // CreateStream creates a new Stream with the supplied headers. + CreateStream(headers http.Header) (Stream, error) + // Close resets all streams and closes the connection. + Close() error + // CloseChan returns a channel that is closed when the underlying connection is closed. + CloseChan() <-chan bool + // SetIdleTimeout sets the amount of time the connection may remain idle before + // it is automatically closed. + SetIdleTimeout(timeout time.Duration) +} + +// Stream represents a bidirectional communications channel that is part of an +// upgraded connection. +type Stream interface { + io.ReadWriteCloser + // Reset closes both directions of the stream, indicating that neither client + // or server can use it any more. + Reset() error + // Headers returns the headers used to create the stream. + Headers() http.Header + // Identifier returns the stream's ID. + Identifier() uint32 +} + +// IsUpgradeRequest returns true if the given request is a connection upgrade request +func IsUpgradeRequest(req *http.Request) bool { + for _, h := range req.Header[http.CanonicalHeaderKey(HeaderConnection)] { + if strings.Contains(strings.ToLower(h), strings.ToLower(HeaderUpgrade)) { + return true + } + } + return false +} + +func negotiateProtocol(clientProtocols, serverProtocols []string) string { + for i := range clientProtocols { + for j := range serverProtocols { + if clientProtocols[i] == serverProtocols[j] { + return clientProtocols[i] + } + } + } + return "" +} + +// Handshake performs a subprotocol negotiation. If the client did request a +// subprotocol, Handshake will select the first common value found in +// serverProtocols. If a match is found, Handshake adds a response header +// indicating the chosen subprotocol. If no match is found, HTTP forbidden is +// returned, along with a response header containing the list of protocols the +// server can accept. +func Handshake(req *http.Request, w http.ResponseWriter, serverProtocols []string) (string, error) { + clientProtocols := req.Header[http.CanonicalHeaderKey(HeaderProtocolVersion)] + if len(clientProtocols) == 0 { + // Kube 1.0 clients didn't support subprotocol negotiation. + // TODO require clientProtocols once Kube 1.0 is no longer supported + return "", nil + } + + if len(serverProtocols) == 0 { + // Kube 1.0 servers didn't support subprotocol negotiation. This is mainly for testing. + // TODO require serverProtocols once Kube 1.0 is no longer supported + return "", nil + } + + negotiatedProtocol := negotiateProtocol(clientProtocols, serverProtocols) + if len(negotiatedProtocol) == 0 { + for i := range serverProtocols { + w.Header().Add(HeaderAcceptedProtocolVersions, serverProtocols[i]) + } + err := fmt.Errorf("unable to upgrade: unable to negotiate protocol: client supports %v, server accepts %v", clientProtocols, serverProtocols) + http.Error(w, err.Error(), http.StatusForbidden) + return "", err + } + + w.Header().Add(HeaderProtocolVersion, negotiatedProtocol) + return negotiatedProtocol, nil +} diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/httpstream_test.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/httpstream_test.go new file mode 100644 index 0000000000..f7f9a3ebf4 --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/httpstream_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httpstream + +import ( + "net/http" + "reflect" + "testing" +) + +type responseWriter struct { + header http.Header + statusCode *int +} + +func newResponseWriter() *responseWriter { + return &responseWriter{ + header: make(http.Header), + } +} + +func (r *responseWriter) Header() http.Header { + return r.header +} + +func (r *responseWriter) WriteHeader(code int) { + r.statusCode = &code +} + +func (r *responseWriter) Write([]byte) (int, error) { + return 0, nil +} + +func TestHandshake(t *testing.T) { + tests := map[string]struct { + clientProtocols []string + serverProtocols []string + expectedProtocol string + expectError bool + }{ + "no client protocols": { + clientProtocols: []string{}, + serverProtocols: []string{"a", "b"}, + expectedProtocol: "", + }, + "no common protocol": { + clientProtocols: []string{"c"}, + serverProtocols: []string{"a", "b"}, + expectedProtocol: "", + expectError: true, + }, + "common protocol": { + clientProtocols: []string{"b"}, + serverProtocols: []string{"a", "b"}, + expectedProtocol: "b", + }, + } + + for name, test := range tests { + req, err := http.NewRequest("GET", "http://www.example.com/", nil) + if err != nil { + t.Fatalf("%s: error creating request: %v", name, err) + } + + for _, p := range test.clientProtocols { + req.Header.Add(HeaderProtocolVersion, p) + } + + w := newResponseWriter() + negotiated, err := Handshake(req, w, test.serverProtocols) + + // verify negotiated protocol + if e, a := test.expectedProtocol, negotiated; e != a { + t.Errorf("%s: protocol: expected %q, got %q", name, e, a) + } + + if test.expectError { + if err == nil { + t.Errorf("%s: expected error but did not get one", name) + } + if w.statusCode == nil { + t.Errorf("%s: expected w.statusCode to be set", name) + } else if e, a := http.StatusForbidden, *w.statusCode; e != a { + t.Errorf("%s: w.statusCode: expected %d, got %d", name, e, a) + } + if e, a := test.serverProtocols, w.Header()[HeaderAcceptedProtocolVersions]; !reflect.DeepEqual(e, a) { + t.Errorf("%s: accepted server protocols: expected %v, got %v", name, e, a) + } + continue + } + if !test.expectError && err != nil { + t.Errorf("%s: unexpected error: %v", name, err) + continue + } + if w.statusCode != nil { + t.Errorf("%s: unexpected non-nil w.statusCode: %d", name, w.statusCode) + } + + if len(test.expectedProtocol) == 0 { + if len(w.Header()[HeaderProtocolVersion]) > 0 { + t.Errorf("%s: unexpected protocol version response header: %s", name, w.Header()[HeaderProtocolVersion]) + } + continue + } + + // verify response headers + if e, a := []string{test.expectedProtocol}, w.Header()[HeaderProtocolVersion]; !reflect.DeepEqual(e, a) { + t.Errorf("%s: protocol response header: expected %v, got %v", name, e, a) + } + } +} diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go new file mode 100644 index 0000000000..9d222faa89 --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection.go @@ -0,0 +1,145 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "net" + "net/http" + "sync" + "time" + + "github.com/docker/spdystream" + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/klog" +) + +// connection maintains state about a spdystream.Connection and its associated +// streams. +type connection struct { + conn *spdystream.Connection + streams []httpstream.Stream + streamLock sync.Mutex + newStreamHandler httpstream.NewStreamHandler +} + +// NewClientConnection creates a new SPDY client connection. +func NewClientConnection(conn net.Conn) (httpstream.Connection, error) { + spdyConn, err := spdystream.NewConnection(conn, false) + if err != nil { + defer conn.Close() + return nil, err + } + + return newConnection(spdyConn, httpstream.NoOpNewStreamHandler), nil +} + +// NewServerConnection creates a new SPDY server connection. newStreamHandler +// will be invoked when the server receives a newly created stream from the +// client. +func NewServerConnection(conn net.Conn, newStreamHandler httpstream.NewStreamHandler) (httpstream.Connection, error) { + spdyConn, err := spdystream.NewConnection(conn, true) + if err != nil { + defer conn.Close() + return nil, err + } + + return newConnection(spdyConn, newStreamHandler), nil +} + +// newConnection returns a new connection wrapping conn. newStreamHandler +// will be invoked when the server receives a newly created stream from the +// client. +func newConnection(conn *spdystream.Connection, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { + c := &connection{conn: conn, newStreamHandler: newStreamHandler} + go conn.Serve(c.newSpdyStream) + return c +} + +// createStreamResponseTimeout indicates how long to wait for the other side to +// acknowledge the new stream before timing out. +const createStreamResponseTimeout = 30 * time.Second + +// Close first sends a reset for all of the connection's streams, and then +// closes the underlying spdystream.Connection. +func (c *connection) Close() error { + c.streamLock.Lock() + for _, s := range c.streams { + // calling Reset instead of Close ensures that all streams are fully torn down + s.Reset() + } + c.streams = make([]httpstream.Stream, 0) + c.streamLock.Unlock() + + // now that all streams are fully torn down, it's safe to call close on the underlying connection, + // which should be able to terminate immediately at this point, instead of waiting for any + // remaining graceful stream termination. + return c.conn.Close() +} + +// CreateStream creates a new stream with the specified headers and registers +// it with the connection. +func (c *connection) CreateStream(headers http.Header) (httpstream.Stream, error) { + stream, err := c.conn.CreateStream(headers, nil, false) + if err != nil { + return nil, err + } + if err = stream.WaitTimeout(createStreamResponseTimeout); err != nil { + return nil, err + } + + c.registerStream(stream) + return stream, nil +} + +// registerStream adds the stream s to the connection's list of streams that +// it owns. +func (c *connection) registerStream(s httpstream.Stream) { + c.streamLock.Lock() + c.streams = append(c.streams, s) + c.streamLock.Unlock() +} + +// CloseChan returns a channel that, when closed, indicates that the underlying +// spdystream.Connection has been closed. +func (c *connection) CloseChan() <-chan bool { + return c.conn.CloseChan() +} + +// newSpdyStream is the internal new stream handler used by spdystream.Connection.Serve. +// It calls connection's newStreamHandler, giving it the opportunity to accept or reject +// the stream. If newStreamHandler returns an error, the stream is rejected. If not, the +// stream is accepted and registered with the connection. +func (c *connection) newSpdyStream(stream *spdystream.Stream) { + replySent := make(chan struct{}) + err := c.newStreamHandler(stream, replySent) + rejectStream := (err != nil) + if rejectStream { + klog.Warningf("Stream rejected: %v", err) + stream.Reset() + return + } + + c.registerStream(stream) + stream.SendReply(http.Header{}, rejectStream) + close(replySent) +} + +// SetIdleTimeout sets the amount of time the connection may remain idle before +// it is automatically closed. +func (c *connection) SetIdleTimeout(timeout time.Duration) { + c.conn.SetIdleTimeout(timeout) +} diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go new file mode 100644 index 0000000000..f94e2b3ba4 --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/connection_test.go @@ -0,0 +1,164 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "io" + "net" + "net/http" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/httpstream" +) + +func runProxy(t *testing.T, backendUrl string, proxyUrl chan<- string, proxyDone chan<- struct{}) { + listener, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatalf("error listening: %v", err) + } + defer listener.Close() + + proxyUrl <- listener.Addr().String() + + clientConn, err := listener.Accept() + if err != nil { + t.Errorf("proxy: error accepting client connection: %v", err) + return + } + + backendConn, err := net.Dial("tcp4", backendUrl) + if err != nil { + t.Errorf("proxy: error dialing backend: %v", err) + return + } + defer backendConn.Close() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + io.Copy(backendConn, clientConn) + }() + + go func() { + defer wg.Done() + io.Copy(clientConn, backendConn) + }() + + wg.Wait() + + proxyDone <- struct{}{} +} + +func runServer(t *testing.T, backendUrl chan<- string, serverDone chan<- struct{}) { + listener, err := net.Listen("tcp4", "localhost:0") + if err != nil { + t.Fatalf("server: error listening: %v", err) + } + defer listener.Close() + + backendUrl <- listener.Addr().String() + + conn, err := listener.Accept() + if err != nil { + t.Errorf("server: error accepting connection: %v", err) + return + } + + streamChan := make(chan httpstream.Stream) + replySentChan := make(chan (<-chan struct{})) + spdyConn, err := NewServerConnection(conn, func(stream httpstream.Stream, replySent <-chan struct{}) error { + streamChan <- stream + replySentChan <- replySent + return nil + }) + if err != nil { + t.Errorf("server: error creating spdy connection: %v", err) + return + } + + stream := <-streamChan + replySent := <-replySentChan + <-replySent + + buf := make([]byte, 1) + _, err = stream.Read(buf) + if err != io.EOF { + t.Errorf("server: unexpected read error: %v", err) + return + } + + <-spdyConn.CloseChan() + raw := spdyConn.(*connection).conn + if err := raw.Wait(15 * time.Second); err != nil { + t.Errorf("server: timed out waiting for connection closure: %v", err) + } + + serverDone <- struct{}{} +} + +func TestConnectionCloseIsImmediateThroughAProxy(t *testing.T) { + serverDone := make(chan struct{}) + backendUrlChan := make(chan string) + go runServer(t, backendUrlChan, serverDone) + backendUrl := <-backendUrlChan + + proxyDone := make(chan struct{}) + proxyUrlChan := make(chan string) + go runProxy(t, backendUrl, proxyUrlChan, proxyDone) + proxyUrl := <-proxyUrlChan + + conn, err := net.Dial("tcp4", proxyUrl) + if err != nil { + t.Fatalf("client: error connecting to proxy: %v", err) + } + + spdyConn, err := NewClientConnection(conn) + if err != nil { + t.Fatalf("client: error creating spdy connection: %v", err) + } + + if _, err := spdyConn.CreateStream(http.Header{}); err != nil { + t.Fatalf("client: error creating stream: %v", err) + } + + spdyConn.Close() + raw := spdyConn.(*connection).conn + if err := raw.Wait(15 * time.Second); err != nil { + t.Fatalf("client: timed out waiting for connection closure: %v", err) + } + + expired := time.NewTimer(15 * time.Second) + defer expired.Stop() + i := 0 + for { + select { + case <-expired.C: + t.Fatalf("timed out waiting for proxy and/or server closure") + case <-serverDone: + i++ + case <-proxyDone: + i++ + } + if i == 2 { + break + } + } +} diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go new file mode 100644 index 0000000000..2699597e7a --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper.go @@ -0,0 +1,335 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "bufio" + "bytes" + "context" + "crypto/tls" + "encoding/base64" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "net/http/httputil" + "net/url" + "strings" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/util/httpstream" + utilnet "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/third_party/forked/golang/netutil" +) + +// SpdyRoundTripper knows how to upgrade an HTTP request to one that supports +// multiplexed streams. After RoundTrip() is invoked, Conn will be set +// and usable. SpdyRoundTripper implements the UpgradeRoundTripper interface. +type SpdyRoundTripper struct { + //tlsConfig holds the TLS configuration settings to use when connecting + //to the remote server. + tlsConfig *tls.Config + + /* TODO according to http://golang.org/pkg/net/http/#RoundTripper, a RoundTripper + must be safe for use by multiple concurrent goroutines. If this is absolutely + necessary, we could keep a map from http.Request to net.Conn. In practice, + a client will create an http.Client, set the transport to a new insteace of + SpdyRoundTripper, and use it a single time, so this hopefully won't be an issue. + */ + // conn is the underlying network connection to the remote server. + conn net.Conn + + // Dialer is the dialer used to connect. Used if non-nil. + Dialer *net.Dialer + + // proxier knows which proxy to use given a request, defaults to http.ProxyFromEnvironment + // Used primarily for mocking the proxy discovery in tests. + proxier func(req *http.Request) (*url.URL, error) + + // followRedirects indicates if the round tripper should examine responses for redirects and + // follow them. + followRedirects bool + // requireSameHostRedirects restricts redirect following to only follow redirects to the same host + // as the original request. + requireSameHostRedirects bool +} + +var _ utilnet.TLSClientConfigHolder = &SpdyRoundTripper{} +var _ httpstream.UpgradeRoundTripper = &SpdyRoundTripper{} +var _ utilnet.Dialer = &SpdyRoundTripper{} + +// NewRoundTripper creates a new SpdyRoundTripper that will use +// the specified tlsConfig. +func NewRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) httpstream.UpgradeRoundTripper { + return NewSpdyRoundTripper(tlsConfig, followRedirects, requireSameHostRedirects) +} + +// NewSpdyRoundTripper creates a new SpdyRoundTripper that will use +// the specified tlsConfig. This function is mostly meant for unit tests. +func NewSpdyRoundTripper(tlsConfig *tls.Config, followRedirects, requireSameHostRedirects bool) *SpdyRoundTripper { + return &SpdyRoundTripper{ + tlsConfig: tlsConfig, + followRedirects: followRedirects, + requireSameHostRedirects: requireSameHostRedirects, + } +} + +// TLSClientConfig implements pkg/util/net.TLSClientConfigHolder for proper TLS checking during +// proxying with a spdy roundtripper. +func (s *SpdyRoundTripper) TLSClientConfig() *tls.Config { + return s.tlsConfig +} + +// Dial implements k8s.io/apimachinery/pkg/util/net.Dialer. +func (s *SpdyRoundTripper) Dial(req *http.Request) (net.Conn, error) { + conn, err := s.dial(req) + if err != nil { + return nil, err + } + + if err := req.Write(conn); err != nil { + conn.Close() + return nil, err + } + + return conn, nil +} + +// dial dials the host specified by req, using TLS if appropriate, optionally +// using a proxy server if one is configured via environment variables. +func (s *SpdyRoundTripper) dial(req *http.Request) (net.Conn, error) { + proxier := s.proxier + if proxier == nil { + proxier = utilnet.NewProxierWithNoProxyCIDR(http.ProxyFromEnvironment) + } + proxyURL, err := proxier(req) + if err != nil { + return nil, err + } + + if proxyURL == nil { + return s.dialWithoutProxy(req.Context(), req.URL) + } + + // ensure we use a canonical host with proxyReq + targetHost := netutil.CanonicalAddr(req.URL) + + // proxying logic adapted from http://blog.h6t.eu/post/74098062923/golang-websocket-with-http-proxy-support + proxyReq := http.Request{ + Method: "CONNECT", + URL: &url.URL{}, + Host: targetHost, + } + + if pa := s.proxyAuth(proxyURL); pa != "" { + proxyReq.Header = http.Header{} + proxyReq.Header.Set("Proxy-Authorization", pa) + } + + proxyDialConn, err := s.dialWithoutProxy(req.Context(), proxyURL) + if err != nil { + return nil, err + } + + proxyClientConn := httputil.NewProxyClientConn(proxyDialConn, nil) + _, err = proxyClientConn.Do(&proxyReq) + if err != nil && err != httputil.ErrPersistEOF { + return nil, err + } + + rwc, _ := proxyClientConn.Hijack() + + if req.URL.Scheme != "https" { + return rwc, nil + } + + host, _, err := net.SplitHostPort(targetHost) + if err != nil { + return nil, err + } + + tlsConfig := s.tlsConfig + switch { + case tlsConfig == nil: + tlsConfig = &tls.Config{ServerName: host} + case len(tlsConfig.ServerName) == 0: + tlsConfig = tlsConfig.Clone() + tlsConfig.ServerName = host + } + + tlsConn := tls.Client(rwc, tlsConfig) + + // need to manually call Handshake() so we can call VerifyHostname() below + if err := tlsConn.Handshake(); err != nil { + return nil, err + } + + // Return if we were configured to skip validation + if tlsConfig.InsecureSkipVerify { + return tlsConn, nil + } + + if err := tlsConn.VerifyHostname(tlsConfig.ServerName); err != nil { + return nil, err + } + + return tlsConn, nil +} + +// dialWithoutProxy dials the host specified by url, using TLS if appropriate. +func (s *SpdyRoundTripper) dialWithoutProxy(ctx context.Context, url *url.URL) (net.Conn, error) { + dialAddr := netutil.CanonicalAddr(url) + + if url.Scheme == "http" { + if s.Dialer == nil { + var d net.Dialer + return d.DialContext(ctx, "tcp", dialAddr) + } else { + return s.Dialer.DialContext(ctx, "tcp", dialAddr) + } + } + + // TODO validate the TLSClientConfig is set up? + var conn *tls.Conn + var err error + if s.Dialer == nil { + conn, err = tls.Dial("tcp", dialAddr, s.tlsConfig) + } else { + conn, err = tls.DialWithDialer(s.Dialer, "tcp", dialAddr, s.tlsConfig) + } + if err != nil { + return nil, err + } + + // Return if we were configured to skip validation + if s.tlsConfig != nil && s.tlsConfig.InsecureSkipVerify { + return conn, nil + } + + host, _, err := net.SplitHostPort(dialAddr) + if err != nil { + return nil, err + } + if s.tlsConfig != nil && len(s.tlsConfig.ServerName) > 0 { + host = s.tlsConfig.ServerName + } + err = conn.VerifyHostname(host) + if err != nil { + return nil, err + } + + return conn, nil +} + +// proxyAuth returns, for a given proxy URL, the value to be used for the Proxy-Authorization header +func (s *SpdyRoundTripper) proxyAuth(proxyURL *url.URL) string { + if proxyURL == nil || proxyURL.User == nil { + return "" + } + credentials := proxyURL.User.String() + encodedAuth := base64.StdEncoding.EncodeToString([]byte(credentials)) + return fmt.Sprintf("Basic %s", encodedAuth) +} + +// RoundTrip executes the Request and upgrades it. After a successful upgrade, +// clients may call SpdyRoundTripper.Connection() to retrieve the upgraded +// connection. +func (s *SpdyRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + header := utilnet.CloneHeader(req.Header) + header.Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade) + header.Add(httpstream.HeaderUpgrade, HeaderSpdy31) + + var ( + conn net.Conn + rawResponse []byte + err error + ) + + if s.followRedirects { + conn, rawResponse, err = utilnet.ConnectWithRedirects(req.Method, req.URL, header, req.Body, s, s.requireSameHostRedirects) + } else { + clone := utilnet.CloneRequest(req) + clone.Header = header + conn, err = s.Dial(clone) + } + if err != nil { + return nil, err + } + + responseReader := bufio.NewReader( + io.MultiReader( + bytes.NewBuffer(rawResponse), + conn, + ), + ) + + resp, err := http.ReadResponse(responseReader, nil) + if err != nil { + if conn != nil { + conn.Close() + } + return nil, err + } + + s.conn = conn + + return resp, nil +} + +// NewConnection validates the upgrade response, creating and returning a new +// httpstream.Connection if there were no errors. +func (s *SpdyRoundTripper) NewConnection(resp *http.Response) (httpstream.Connection, error) { + connectionHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderConnection)) + upgradeHeader := strings.ToLower(resp.Header.Get(httpstream.HeaderUpgrade)) + if (resp.StatusCode != http.StatusSwitchingProtocols) || !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) { + defer resp.Body.Close() + responseError := "" + responseErrorBytes, err := ioutil.ReadAll(resp.Body) + if err != nil { + responseError = "unable to read error from server response" + } else { + // TODO: I don't belong here, I should be abstracted from this class + if obj, _, err := statusCodecs.UniversalDecoder().Decode(responseErrorBytes, nil, &metav1.Status{}); err == nil { + if status, ok := obj.(*metav1.Status); ok { + return nil, &apierrors.StatusError{ErrStatus: *status} + } + } + responseError = string(responseErrorBytes) + responseError = strings.TrimSpace(responseError) + } + + return nil, fmt.Errorf("unable to upgrade connection: %s", responseError) + } + + return NewClientConnection(s.conn) +} + +// statusScheme is private scheme for the decoding here until someone fixes the TODO in NewConnection +var statusScheme = runtime.NewScheme() + +// ParameterCodec knows about query parameters used with the meta v1 API spec. +var statusCodecs = serializer.NewCodecFactory(statusScheme) + +func init() { + statusScheme.AddUnversionedTypes(metav1.SchemeGroupVersion, + &metav1.Status{}, + ) +} diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper_test.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper_test.go new file mode 100644 index 0000000000..418b13f876 --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/roundtripper_test.go @@ -0,0 +1,529 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "crypto/tls" + "crypto/x509" + "encoding/base64" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strings" + "sync/atomic" + "testing" + + "github.com/elazarl/goproxy" + + "k8s.io/apimachinery/pkg/util/httpstream" +) + +// be sure to unset environment variable https_proxy (if exported) before testing, otherwise the testing will fail unexpectedly. +func TestRoundTripAndNewConnection(t *testing.T) { + for _, redirect := range []bool{false, true} { + t.Run(fmt.Sprintf("redirect = %t", redirect), func(t *testing.T) { + localhostPool := x509.NewCertPool() + if !localhostPool.AppendCertsFromPEM(localhostCert) { + t.Errorf("error setting up localhostCert pool") + } + + httpsServerInvalidHostname := func(h http.Handler) *httptest.Server { + cert, err := tls.X509KeyPair(exampleCert, exampleKey) + if err != nil { + t.Errorf("https (invalid hostname): proxy_test: %v", err) + } + ts := httptest.NewUnstartedServer(h) + ts.TLS = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + ts.StartTLS() + return ts + } + + httpsServerValidHostname := func(h http.Handler) *httptest.Server { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + t.Errorf("https (valid hostname): proxy_test: %v", err) + } + ts := httptest.NewUnstartedServer(h) + ts.TLS = &tls.Config{ + Certificates: []tls.Certificate{cert}, + } + ts.StartTLS() + return ts + } + + testCases := map[string]struct { + serverFunc func(http.Handler) *httptest.Server + proxyServerFunc func(http.Handler) *httptest.Server + proxyAuth *url.Userinfo + clientTLS *tls.Config + serverConnectionHeader string + serverUpgradeHeader string + serverStatusCode int + shouldError bool + }{ + "no headers": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "", + serverUpgradeHeader: "", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "no upgrade header": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "no connection header": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "no switching protocol status code": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusForbidden, + shouldError: true, + }, + "http": { + serverFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "https (invalid hostname + InsecureSkipVerify)": { + serverFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "https (invalid hostname + hostname verification)": { + serverFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: false}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, + }, + "https (valid hostname + RootCAs)": { + serverFunc: httpsServerValidHostname, + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied http->http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httptest.NewServer, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https (invalid hostname + InsecureSkipVerify) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https with auth (invalid hostname + InsecureSkipVerify) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerInvalidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https (invalid hostname + hostname verification) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: false}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, // fails because the client doesn't trust the proxy + }, + "proxied https (valid hostname + RootCAs) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerValidHostname, + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https with auth (valid hostname + RootCAs) -> http": { + serverFunc: httptest.NewServer, + proxyServerFunc: httpsServerValidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https (invalid hostname + InsecureSkipVerify) -> https (invalid hostname)": { + serverFunc: httpsServerInvalidHostname, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, // works because the test proxy ignores TLS errors + }, + "proxied https with auth (invalid hostname + InsecureSkipVerify) -> https (invalid hostname)": { + serverFunc: httpsServerInvalidHostname, + proxyServerFunc: httpsServerInvalidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{InsecureSkipVerify: true}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, // works because the test proxy ignores TLS errors + }, + "proxied https (invalid hostname + hostname verification) -> https (invalid hostname)": { + serverFunc: httpsServerInvalidHostname, + proxyServerFunc: httpsServerInvalidHostname, + clientTLS: &tls.Config{InsecureSkipVerify: false}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: true, // fails because the client doesn't trust the proxy + }, + "proxied https (valid hostname + RootCAs) -> https (valid hostname + RootCAs)": { + serverFunc: httpsServerValidHostname, + proxyServerFunc: httpsServerValidHostname, + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + "proxied https with auth (valid hostname + RootCAs) -> https (valid hostname + RootCAs)": { + serverFunc: httpsServerValidHostname, + proxyServerFunc: httpsServerValidHostname, + proxyAuth: url.UserPassword("proxyuser", "proxypasswd"), + clientTLS: &tls.Config{RootCAs: localhostPool}, + serverConnectionHeader: "Upgrade", + serverUpgradeHeader: "SPDY/3.1", + serverStatusCode: http.StatusSwitchingProtocols, + shouldError: false, + }, + } + + for k, testCase := range testCases { + server := testCase.serverFunc(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if testCase.shouldError { + if e, a := httpstream.HeaderUpgrade, req.Header.Get(httpstream.HeaderConnection); e != a { + t.Fatalf("%s: Expected connection=upgrade header, got '%s", k, a) + } + + w.Header().Set(httpstream.HeaderConnection, testCase.serverConnectionHeader) + w.Header().Set(httpstream.HeaderUpgrade, testCase.serverUpgradeHeader) + w.WriteHeader(testCase.serverStatusCode) + + return + } + + streamCh := make(chan httpstream.Stream) + + responseUpgrader := NewResponseUpgrader() + spdyConn := responseUpgrader.UpgradeResponse(w, req, func(s httpstream.Stream, replySent <-chan struct{}) error { + streamCh <- s + return nil + }) + if spdyConn == nil { + t.Fatalf("%s: unexpected nil spdyConn", k) + } + defer spdyConn.Close() + + stream := <-streamCh + io.Copy(stream, stream) + })) + defer server.Close() + + serverURL, err := url.Parse(server.URL) + if err != nil { + t.Fatalf("%s: Error creating request: %s", k, err) + } + req, err := http.NewRequest("GET", server.URL, nil) + if err != nil { + t.Fatalf("%s: Error creating request: %s", k, err) + } + + spdyTransport := NewSpdyRoundTripper(testCase.clientTLS, redirect, redirect) + + var proxierCalled bool + var proxyCalledWithHost string + var proxyCalledWithAuth bool + var proxyCalledWithAuthHeader string + if testCase.proxyServerFunc != nil { + proxyHandler := goproxy.NewProxyHttpServer() + + proxyHandler.OnRequest().HandleConnectFunc(func(host string, ctx *goproxy.ProxyCtx) (*goproxy.ConnectAction, string) { + proxyCalledWithHost = host + + proxyAuthHeaderName := "Proxy-Authorization" + _, proxyCalledWithAuth = ctx.Req.Header[proxyAuthHeaderName] + proxyCalledWithAuthHeader = ctx.Req.Header.Get(proxyAuthHeaderName) + return goproxy.OkConnect, host + }) + + proxy := testCase.proxyServerFunc(proxyHandler) + + spdyTransport.proxier = func(proxierReq *http.Request) (*url.URL, error) { + proxierCalled = true + proxyURL, err := url.Parse(proxy.URL) + if err != nil { + return nil, err + } + proxyURL.User = testCase.proxyAuth + return proxyURL, nil + } + defer proxy.Close() + } + + client := &http.Client{Transport: spdyTransport} + + resp, err := client.Do(req) + var conn httpstream.Connection + if err == nil { + conn, err = spdyTransport.NewConnection(resp) + } + haveErr := err != nil + if e, a := testCase.shouldError, haveErr; e != a { + t.Fatalf("%s: shouldError=%t, got %t: %v", k, e, a, err) + } + if testCase.shouldError { + continue + } + defer conn.Close() + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Fatalf("%s: expected http 101 switching protocols, got %d", k, resp.StatusCode) + } + + stream, err := conn.CreateStream(http.Header{}) + if err != nil { + t.Fatalf("%s: error creating client stream: %s", k, err) + } + + n, err := stream.Write([]byte("hello")) + if err != nil { + t.Fatalf("%s: error writing to stream: %s", k, err) + } + if n != 5 { + t.Fatalf("%s: Expected to write 5 bytes, but actually wrote %d", k, n) + } + + b := make([]byte, 5) + n, err = stream.Read(b) + if err != nil { + t.Fatalf("%s: error reading from stream: %s", k, err) + } + if n != 5 { + t.Fatalf("%s: Expected to read 5 bytes, but actually read %d", k, n) + } + if e, a := "hello", string(b[0:n]); e != a { + t.Fatalf("%s: expected '%s', got '%s'", k, e, a) + } + + if testCase.proxyServerFunc != nil { + if !proxierCalled { + t.Fatalf("%s: Expected to use a proxy but proxier in SpdyRoundTripper wasn't called", k) + } + if proxyCalledWithHost != serverURL.Host { + t.Fatalf("%s: Expected to see a call to the proxy for backend %q, got %q", k, serverURL.Host, proxyCalledWithHost) + } + } + + var expectedProxyAuth string + if testCase.proxyAuth != nil { + encodedCredentials := base64.StdEncoding.EncodeToString([]byte(testCase.proxyAuth.String())) + expectedProxyAuth = "Basic " + encodedCredentials + } + if len(expectedProxyAuth) == 0 && proxyCalledWithAuth { + t.Fatalf("%s: Proxy authorization unexpected, got %q", k, proxyCalledWithAuthHeader) + } + if proxyCalledWithAuthHeader != expectedProxyAuth { + t.Fatalf("%s: Expected to see a call to the proxy with credentials %q, got %q", k, testCase.proxyAuth, proxyCalledWithAuthHeader) + } + } + }) + } +} + +func TestRoundTripRedirects(t *testing.T) { + tests := []struct { + redirects int32 + expectSuccess bool + }{ + {0, true}, + {1, true}, + {9, true}, + {10, false}, + } + for _, test := range tests { + t.Run(fmt.Sprintf("with %d redirects", test.redirects), func(t *testing.T) { + var redirects int32 = 0 + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if redirects < test.redirects { + atomic.AddInt32(&redirects, 1) + http.Redirect(w, req, "redirect", http.StatusFound) + return + } + streamCh := make(chan httpstream.Stream) + + responseUpgrader := NewResponseUpgrader() + spdyConn := responseUpgrader.UpgradeResponse(w, req, func(s httpstream.Stream, replySent <-chan struct{}) error { + streamCh <- s + return nil + }) + if spdyConn == nil { + t.Fatalf("unexpected nil spdyConn") + } + defer spdyConn.Close() + + stream := <-streamCh + io.Copy(stream, stream) + })) + defer server.Close() + + req, err := http.NewRequest("GET", server.URL, nil) + if err != nil { + t.Fatalf("Error creating request: %s", err) + } + + spdyTransport := NewSpdyRoundTripper(nil, true, true) + client := &http.Client{Transport: spdyTransport} + + resp, err := client.Do(req) + if test.expectSuccess { + if err != nil { + t.Fatalf("error calling Do: %v", err) + } + } else { + if err == nil { + t.Fatalf("expecting an error") + } else if !strings.Contains(err.Error(), "too many redirects") { + t.Fatalf("expecting too many redirects, got %v", err) + } + return + } + + conn, err := spdyTransport.NewConnection(resp) + if err != nil { + t.Fatalf("error calling NewConnection: %v", err) + } + defer conn.Close() + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Fatalf("expected http 101 switching protocols, got %d", resp.StatusCode) + } + + stream, err := conn.CreateStream(http.Header{}) + if err != nil { + t.Fatalf("error creating client stream: %s", err) + } + + n, err := stream.Write([]byte("hello")) + if err != nil { + t.Fatalf("error writing to stream: %s", err) + } + if n != 5 { + t.Fatalf("Expected to write 5 bytes, but actually wrote %d", n) + } + + b := make([]byte, 5) + n, err = stream.Read(b) + if err != nil { + t.Fatalf("error reading from stream: %s", err) + } + if n != 5 { + t.Fatalf("Expected to read 5 bytes, but actually read %d", n) + } + if e, a := "hello", string(b[0:n]); e != a { + t.Fatalf("expected '%s', got '%s'", e, a) + } + }) + } +} + +// exampleCert was generated from crypto/tls/generate_cert.go with the following command: +// go run generate_cert.go --rsa-bits 512 --host example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var exampleCert = []byte(`-----BEGIN CERTIFICATE----- +MIIBdzCCASGgAwIBAgIRAOVTAdPnfbS5V85mfS90TfIwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC +QQCoVSqeu8TBvF+70T7Jm4340YQNhds6IxjRoifenYodAO1dnKGrcbF266DJGunh +nIjQH7B12tduhl0fLK4Ezf7/AgMBAAGjUDBOMA4GA1UdDwEB/wQEAwICpDATBgNV +HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MBYGA1UdEQQPMA2CC2V4 +YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA0EAk1kVa5uZ/AzwYDVcS9bpM/czwjjV +xq3VeSCfmNa2uNjbFvodmCRwZOHUvipAMGCUCV6j5vMrJ8eMj8tCQ36W9A== +-----END CERTIFICATE-----`) + +var exampleKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIBOgIBAAJBAKhVKp67xMG8X7vRPsmbjfjRhA2F2zojGNGiJ96dih0A7V2coatx +sXbroMka6eGciNAfsHXa126GXR8srgTN/v8CAwEAAQJASdzdD7vKsUwMIejGCUb1 +fAnLTPfAY3lFCa+CmR89nE22dAoRDv+5RbnBsZ58BazPNJHrsVPRlfXB3OQmSQr0 +SQIhANoJhs+xOJE/i8nJv0uAbzKyiD1YkvRkta0GpUOULyAVAiEAxaQus3E/SuqD +P7y5NeJnE7X6XkyC35zrsJRkz7orE8MCIHdDjsI8pjyNDeGqwUCDWE/a6DrmIDwe +emHSqMN2YvChAiEAnxLCM9NWaenOsaIoP+J1rDuvw+4499nJKVqGuVrSCRkCIEqK +4KSchPMc3x8M/uhw9oWTtKFmjA/PPh0FsWCdKrEy +-----END RSA PRIVATE KEY-----`) + +// localhostCert was generated from crypto/tls/generate_cert.go with the following command: +// go run generate_cert.go --rsa-bits 512 --host 127.0.0.1,::1,example.com --ca --start-date "Jan 1 00:00:00 1970" --duration=1000000h +var localhostCert = []byte(`-----BEGIN CERTIFICATE----- +MIIBjzCCATmgAwIBAgIRAKpi2WmTcFrVjxrl5n5YDUEwDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzBcMA0GCSqGSIb3DQEBAQUAA0sAMEgC +QQC9fEbRszP3t14Gr4oahV7zFObBI4TfA5i7YnlMXeLinb7MnvT4bkfOJzE6zktn +59zP7UiHs3l4YOuqrjiwM413AgMBAAGjaDBmMA4GA1UdDwEB/wQEAwICpDATBgNV +HSUEDDAKBggrBgEFBQcDATAPBgNVHRMBAf8EBTADAQH/MC4GA1UdEQQnMCWCC2V4 +YW1wbGUuY29thwR/AAABhxAAAAAAAAAAAAAAAAAAAAABMA0GCSqGSIb3DQEBCwUA +A0EAUsVE6KMnza/ZbodLlyeMzdo7EM/5nb5ywyOxgIOCf0OOLHsPS9ueGLQX9HEG +//yjTXuhNcUugExIjM/AIwAZPQ== +-----END CERTIFICATE-----`) + +// localhostKey is the private key for localhostCert. +var localhostKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIBOwIBAAJBAL18RtGzM/e3XgavihqFXvMU5sEjhN8DmLtieUxd4uKdvsye9Phu +R84nMTrOS2fn3M/tSIezeXhg66quOLAzjXcCAwEAAQJBAKcRxH9wuglYLBdI/0OT +BLzfWPZCEw1vZmMR2FF1Fm8nkNOVDPleeVGTWoOEcYYlQbpTmkGSxJ6ya+hqRi6x +goECIQDx3+X49fwpL6B5qpJIJMyZBSCuMhH4B7JevhGGFENi3wIhAMiNJN5Q3UkL +IuSvv03kaPR5XVQ99/UeEetUgGvBcABpAiBJSBzVITIVCGkGc7d+RCf49KTCIklv +bGWObufAR8Ni4QIgWpILjW8dkGg8GOUZ0zaNA6Nvt6TIv2UWGJ4v5PoV98kCIQDx +rIiZs5QbKdycsv9gQJzwQAogC8o04X3Zz3dsoX+h4A== +-----END RSA PRIVATE KEY-----`) diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go new file mode 100644 index 0000000000..045d214d2b --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade.go @@ -0,0 +1,107 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "bufio" + "fmt" + "io" + "net" + "net/http" + "strings" + "sync/atomic" + + "k8s.io/apimachinery/pkg/util/httpstream" + "k8s.io/apimachinery/pkg/util/runtime" +) + +const HeaderSpdy31 = "SPDY/3.1" + +// responseUpgrader knows how to upgrade HTTP responses. It +// implements the httpstream.ResponseUpgrader interface. +type responseUpgrader struct { +} + +// connWrapper is used to wrap a hijacked connection and its bufio.Reader. All +// calls will be handled directly by the underlying net.Conn with the exception +// of Read and Close calls, which will consider data in the bufio.Reader. This +// ensures that data already inside the used bufio.Reader instance is also +// read. +type connWrapper struct { + net.Conn + closed int32 + bufReader *bufio.Reader +} + +func (w *connWrapper) Read(b []byte) (n int, err error) { + if atomic.LoadInt32(&w.closed) == 1 { + return 0, io.EOF + } + return w.bufReader.Read(b) +} + +func (w *connWrapper) Close() error { + err := w.Conn.Close() + atomic.StoreInt32(&w.closed, 1) + return err +} + +// NewResponseUpgrader returns a new httpstream.ResponseUpgrader that is +// capable of upgrading HTTP responses using SPDY/3.1 via the +// spdystream package. +func NewResponseUpgrader() httpstream.ResponseUpgrader { + return responseUpgrader{} +} + +// UpgradeResponse upgrades an HTTP response to one that supports multiplexed +// streams. newStreamHandler will be called synchronously whenever the +// other end of the upgraded connection creates a new stream. +func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection { + connectionHeader := strings.ToLower(req.Header.Get(httpstream.HeaderConnection)) + upgradeHeader := strings.ToLower(req.Header.Get(httpstream.HeaderUpgrade)) + if !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) { + errorMsg := fmt.Sprintf("unable to upgrade: missing upgrade headers in request: %#v", req.Header) + http.Error(w, errorMsg, http.StatusBadRequest) + return nil + } + + hijacker, ok := w.(http.Hijacker) + if !ok { + errorMsg := fmt.Sprintf("unable to upgrade: unable to hijack response") + http.Error(w, errorMsg, http.StatusInternalServerError) + return nil + } + + w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade) + w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31) + w.WriteHeader(http.StatusSwitchingProtocols) + + conn, bufrw, err := hijacker.Hijack() + if err != nil { + runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err)) + return nil + } + + connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader} + spdyConn, err := NewServerConnection(connWithBuf, newStreamHandler) + if err != nil { + runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err)) + return nil + } + + return spdyConn +} diff --git a/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade_test.go b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade_test.go new file mode 100644 index 0000000000..5a514dd5bf --- /dev/null +++ b/vendor_fixes/k8s.io/apimachinery/pkg/util/httpstream/spdy/upgrade_test.go @@ -0,0 +1,93 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package spdy + +import ( + "net/http" + "net/http/httptest" + "testing" +) + +func TestUpgradeResponse(t *testing.T) { + testCases := []struct { + connectionHeader string + upgradeHeader string + shouldError bool + }{ + { + connectionHeader: "", + upgradeHeader: "", + shouldError: true, + }, + { + connectionHeader: "Upgrade", + upgradeHeader: "", + shouldError: true, + }, + { + connectionHeader: "", + upgradeHeader: "SPDY/3.1", + shouldError: true, + }, + { + connectionHeader: "Upgrade", + upgradeHeader: "SPDY/3.1", + shouldError: false, + }, + } + + for i, testCase := range testCases { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + upgrader := NewResponseUpgrader() + conn := upgrader.UpgradeResponse(w, req, nil) + haveErr := conn == nil + if e, a := testCase.shouldError, haveErr; e != a { + t.Fatalf("%d: expected shouldErr=%t, got %t", i, testCase.shouldError, haveErr) + } + if haveErr { + return + } + if conn == nil { + t.Fatalf("%d: unexpected nil conn", i) + } + defer conn.Close() + })) + defer server.Close() + + req, err := http.NewRequest("GET", server.URL, nil) + if err != nil { + t.Fatalf("%d: error creating request: %s", i, err) + } + + req.Header.Set("Connection", testCase.connectionHeader) + req.Header.Set("Upgrade", testCase.upgradeHeader) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + t.Fatalf("%d: unexpected non-nil err from client.Do: %s", i, err) + } + + if testCase.shouldError { + continue + } + + if resp.StatusCode != http.StatusSwitchingProtocols { + t.Fatalf("%d: expected status 101 switching protocols, got %d", i, resp.StatusCode) + } + } +}