diff --git a/integration/kube_integration_test.go b/integration/kube_integration_test.go index 94889bd91ded4..9446c339f19cd 100644 --- a/integration/kube_integration_test.go +++ b/integration/kube_integration_test.go @@ -33,18 +33,12 @@ import ( "testing" "time" - "github.com/gravitational/trace" - log "github.com/sirupsen/logrus" - "github.com/stretchr/testify/require" - "github.com/gravitational/teleport" - apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/profile" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/lib" "github.com/gravitational/teleport/lib/auth/testauthority" "github.com/gravitational/teleport/lib/events" - kubeutils "github.com/gravitational/teleport/lib/kube/utils" "github.com/gravitational/teleport/lib/service" "github.com/gravitational/teleport/lib/services" "github.com/gravitational/teleport/lib/session" @@ -52,16 +46,24 @@ import ( "github.com/gravitational/teleport/lib/utils" "github.com/gravitational/teleport/lib/utils/testlog" - v1 "k8s.io/api/core/v1" + apidefaults "github.com/gravitational/teleport/api/defaults" + kubeutils "github.com/gravitational/teleport/lib/kube/utils" + + "github.com/gravitational/trace" + "github.com/stretchr/testify/require" + "golang.org/x/net/http2" "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - streamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/transport" "k8s.io/client-go/transport/spdy" + + log "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + streamspdy "k8s.io/apimachinery/pkg/util/httpstream/spdy" ) type KubeSuite struct { @@ -156,6 +158,7 @@ func TestKube(t *testing.T) { t.Run("Exec", suite.bind(testKubeExec)) t.Run("Deny", suite.bind(testKubeDeny)) t.Run("PortForward", suite.bind(testKubePortForward)) + t.Run("TransportProtocol", suite.bind(testKubeTransportProtocol)) t.Run("TrustedClustersClientCert", suite.bind(testKubeTrustedClustersClientCert)) t.Run("TrustedClustersSNI", suite.bind(testKubeTrustedClustersSNI)) t.Run("Disconnect", suite.bind(testKubeDisconnect)) @@ -979,7 +982,6 @@ loop: err = impersonatingForwarder.ForwardPorts() require.Error(t, err) require.Regexp(t, ".*impersonation request has been denied.*", err.Error()) - } // TestKubeDisconnect tests kubernetes session disconnects @@ -1094,6 +1096,75 @@ func runKubeDisconnectTest(t *testing.T, suite *KubeSuite, tc disconnectTestCase } } +// testKubeTransportProtocol tests the proxy transport protocol capabilities +func testKubeTransportProtocol(t *testing.T, suite *KubeSuite) { + tconf := suite.teleKubeConfig(Host) + + teleport := NewInstance(InstanceConfig{ + ClusterName: Site, + HostID: HostID, + NodeName: Host, + Priv: suite.priv, + Pub: suite.pub, + log: suite.log, + }) + + username := suite.me.Username + kubeGroups := []string{testImpersonationGroup} + role, err := types.NewRole("kubemaster", types.RoleSpecV4{ + Allow: types.RoleConditions{ + Logins: []string{username}, + KubeGroups: kubeGroups, + }, + }) + require.NoError(t, err) + teleport.AddUserWithRole(username, role) + + err = teleport.CreateEx(t, nil, tconf) + require.NoError(t, err) + + err = teleport.Start() + require.NoError(t, err) + defer teleport.StopAll() + + // set up kube configuration using proxy + _, proxyClientConfig, err := kubeProxyClient(kubeProxyConfig{ + t: teleport, + username: username, + kubeGroups: kubeGroups, + }) + require.NoError(t, err) + + u, err := url.Parse(proxyClientConfig.Host) + require.NoError(t, err) + + u.Scheme = "https" + u.Path = fmt.Sprintf("/api/v1/namespaces/%v/pods/%v", testNamespace, testPod) + + tlsConfig, err := tlsClientConfig(proxyClientConfig) + require.NoError(t, err) + + trans := &http.Transport{ + TLSClientConfig: tlsConfig, + } + + // call proxy with an HTTP1 client + client := &http.Client{Transport: trans} + resp1, err := client.Get(u.String()) + require.NoError(t, err) + defer resp1.Body.Close() + require.Equal(t, resp1.Proto, "HTTP/1.1") + + // call proxy with an HTTP2 client + err = http2.ConfigureTransport(trans) + require.NoError(t, err) + + resp2, err := client.Get(u.String()) + require.NoError(t, err) + defer resp2.Body.Close() + require.Equal(t, resp2.Proto, "HTTP/2.0") +} + // teleKubeConfig sets up teleport with kubernetes turned on func (s *KubeSuite) teleKubeConfig(hostname string) *service.Config { tconf := service.MakeDefaultConfig() diff --git a/lib/kube/proxy/forwarder.go b/lib/kube/proxy/forwarder.go index d8131602cb354..b06b0fcadd052 100644 --- a/lib/kube/proxy/forwarder.go +++ b/lib/kube/proxy/forwarder.go @@ -60,6 +60,7 @@ import ( "github.com/julienschmidt/httprouter" log "github.com/sirupsen/logrus" "golang.org/x/crypto/ssh" + "golang.org/x/net/http2" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/httpstream" @@ -1440,7 +1441,11 @@ func (f *Forwarder) newClusterSessionRemoteCluster(ctx authContext) (*clusterSes tlsConfig: tlsConfig, } - transport := f.newTransport(sess.Dial, sess.tlsConfig) + transport, err := f.newTransport(sess.Dial, sess.tlsConfig) + if err != nil { + return nil, trace.Wrap(err) + } + sess.forwarder, err = forward.New( forward.FlushInterval(100*time.Millisecond), forward.RoundTripper(transport), @@ -1512,13 +1517,18 @@ func (f *Forwarder) newClusterSessionLocal(ctx authContext) (*clusterSession, er tlsConfig: creds.tlsConfig, } + t, err := f.newTransport(sess.Dial, sess.tlsConfig) + if err != nil { + return nil, trace.Wrap(err) + } + // When running inside Kubernetes cluster or using auth/exec providers, // kubeconfig provides a transport wrapper that adds a bearer token to // requests // // When forwarding request to a remote cluster, this is not needed // as the proxy uses client cert auth to reach out to remote proxy. - transport, err := creds.wrapTransport(f.newTransport(sess.Dial, sess.tlsConfig)) + transport, err := creds.wrapTransport(t) if err != nil { return nil, trace.Wrap(err) } @@ -1558,7 +1568,11 @@ func (f *Forwarder) newClusterSessionDirect(ctx authContext, endpoints []kubeClu noAuditEvents: true, } - transport := f.newTransport(sess.Dial, sess.tlsConfig) + transport, err := f.newTransport(sess.Dial, sess.tlsConfig) + if err != nil { + return nil, trace.Wrap(err) + } + sess.forwarder, err = forward.New( forward.FlushInterval(100*time.Millisecond), forward.RoundTripper(transport), @@ -1575,8 +1589,8 @@ func (f *Forwarder) newClusterSessionDirect(ctx authContext, endpoints []kubeClu // DialFunc is a network dialer function that returns a network connection type DialFunc func(string, string) (net.Conn, error) -func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) *http.Transport { - return &http.Transport{ +func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) (*http.Transport, error) { + t := &http.Transport{ Dial: dial, TLSClientConfig: tlsConfig, // Increase the size of the connection pool. This substantially improves the @@ -1587,8 +1601,16 @@ func (f *Forwarder) newTransport(dial DialFunc, tlsConfig *tls.Config) *http.Tra // IdleConnTimeout defines the maximum amount of time before idle connections // are closed. Leaving this unset will lead to connections open forever and // will cause memory leaks in a long running process. - IdleConnTimeout: defaults.HTTPIdleTimeout, + IdleConnTimeout: defaults.HTTPIdleTimeout, + ForceAttemptHTTP2: true, + } + + err := http2.ConfigureTransport(t) + if err != nil { + return nil, trace.Wrap(err) } + + return t, nil } // getOrCreateRequestContext creates a new certificate request for a given context, diff --git a/lib/kube/proxy/server.go b/lib/kube/proxy/server.go index 0b84fa54232d1..bcb2cc2696c81 100644 --- a/lib/kube/proxy/server.go +++ b/lib/kube/proxy/server.go @@ -34,6 +34,7 @@ import ( "github.com/gravitational/trace" log "github.com/sirupsen/logrus" + "golang.org/x/net/http2" ) // TLSServerConfig is a configuration for TLS server @@ -124,6 +125,7 @@ func NewTLSServer(cfg TLSServerConfig) (*TLSServer, error) { Server: &http.Server{ Handler: limiter, ReadHeaderTimeout: apidefaults.DefaultDialTimeout * 2, + TLSConfig: cfg.TLS, }, } server.TLS.GetConfigForClient = server.GetConfigForClient @@ -173,11 +175,15 @@ func (t *TLSServer) Serve(listener net.Listener) error { if err != nil { return trace.Wrap(err) } + go mux.Serve() defer mux.Close() t.mu.Lock() t.listener = mux.TLS() + if err = http2.ConfigureServer(t.Server, &http2.Server{}); err != nil { + return trace.Wrap(err) + } t.mu.Unlock() if t.heartbeat != nil {