Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v15] Add support for Kubernetes websocket subprotocol v5 incoming connections. #39770

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 31 additions & 2 deletions lib/kube/proxy/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func TestExecKubeService(t *testing.T) {
},
},
{
name: "Websocket protocol",
name: "Websocket protocol v4",
args: args{
// We can delete the dummy client once https://github.com/kubernetes/kubernetes/pull/110142
// is merged into k8s go-client.
Expand All @@ -137,6 +137,15 @@ func TestExecKubeService(t *testing.T) {
config: configWithSingleKubeUser,
},
},
{
name: "Websocket protocol v5",
args: args{
executorBuilder: func(c *rest.Config, s string, u *url.URL) (remotecommand.Executor, error) {
return remotecommand.NewWebSocketExecutor(c, s, u.String())
},
config: configWithSingleKubeUser,
},
},
{
name: "SPDY protocol for user with multiple kubernetes users",
args: args{
Expand All @@ -146,7 +155,7 @@ func TestExecKubeService(t *testing.T) {
},
},
{
name: "Websocket protocol for user with multiple kubernetes users",
name: "Websocket protocol v4 for user with multiple kubernetes users",
args: args{
// We can delete the dummy client once https://github.com/kubernetes/kubernetes/pull/110142
// is merged into k8s go-client.
Expand All @@ -158,6 +167,16 @@ func TestExecKubeService(t *testing.T) {
impersonateUser: "admin",
},
},
{
name: "Websocket protocol v5 for user with multiple kubernetes users",
args: args{
executorBuilder: func(c *rest.Config, s string, u *url.URL) (remotecommand.Executor, error) {
return remotecommand.NewWebSocketExecutor(c, s, u.String())
},
config: configMultiKubeUsers,
impersonateUser: "admin",
},
},
{
name: "SPDY protocol for user with multiple kubernetes users without specifying impersonate user",
args: args{
Expand All @@ -166,6 +185,16 @@ func TestExecKubeService(t *testing.T) {
},
wantErr: true,
},
{
name: "Websocket protocol v5 for user with multiple kubernetes users without specifying impersonate user",
args: args{
executorBuilder: func(c *rest.Config, s string, u *url.URL) (remotecommand.Executor, error) {
return remotecommand.NewWebSocketExecutor(c, s, u.String())
},
config: configMultiKubeUsers,
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
42 changes: 21 additions & 21 deletions lib/kube/proxy/remotecommand_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,17 @@ import (
"github.com/go-logr/logr"
"github.com/gravitational/trace"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/klog/v2"
)

const (
stdinChannel = iota
stdoutChannel
stderrChannel
errorChannel
resizeChannel

preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol
preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol
v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
v5BinaryWebsocketProtocol = remotecommand.StreamProtocolV5Name
)

func init() {
Expand All @@ -59,11 +55,11 @@ func init() {
func createChannels(req remoteCommandRequest) []wsstream.ChannelType {
// open the requested channels, and always open the error channel
channels := make([]wsstream.ChannelType, 5)
channels[stdinChannel] = readChannel(req.stdin)
channels[stdoutChannel] = writeChannel(req.stdout)
channels[stderrChannel] = writeChannel(req.stderr)
channels[errorChannel] = wsstream.WriteChannel
channels[resizeChannel] = wsstream.ReadChannel
channels[remotecommand.StreamStdIn] = readChannel(req.stdin)
channels[remotecommand.StreamStdOut] = writeChannel(req.stdout)
channels[remotecommand.StreamStdErr] = writeChannel(req.stderr)
channels[remotecommand.StreamErr] = wsstream.WriteChannel
channels[remotecommand.StreamResize] = wsstream.ReadChannel
return channels
}

Expand Down Expand Up @@ -108,6 +104,10 @@ func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, erro
Binary: false,
Channels: channels,
},
v5BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
})
conn.SetIdleTimeout(IdleTimeout)
negotiatedProtocol, streams, err := conn.Open(
Expand All @@ -121,20 +121,20 @@ func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, erro
// Send an empty message to the lowest writable channel to notify the client the connection is established
switch {
case req.stdout:
streams[stdoutChannel].Write([]byte{})
streams[remotecommand.StreamStdOut].Write([]byte{})
case req.stderr:
streams[stderrChannel].Write([]byte{})
streams[remotecommand.StreamStdErr].Write([]byte{})
default:
streams[errorChannel].Write([]byte{})
streams[streamErr].Write([]byte{})
}

proxy := &remoteCommandProxy{
conn: conn,
stdinStream: streams[stdinChannel],
stdoutStream: streams[stdoutChannel],
stderrStream: streams[stderrChannel],
stdinStream: streams[remotecommand.StreamStdIn],
stdoutStream: streams[remotecommand.StreamStdOut],
stderrStream: streams[remotecommand.StreamStdErr],
tty: req.tty,
resizeStream: streams[resizeChannel],
resizeStream: streams[remotecommand.StreamResize],
}

// When stdin, stdout or stderr are not enabled, websocket creates a io.Pipe
Expand All @@ -153,10 +153,10 @@ func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, erro
}

switch negotiatedProtocol {
case v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol:
proxy.writeStatus = v4WriteStatusFunc(streams[errorChannel])
case v5BinaryWebsocketProtocol, v4BinaryWebsocketProtocol, v4Base64WebsocketProtocol:
proxy.writeStatus = v4WriteStatusFunc(streams[remotecommand.StreamErr])
default:
proxy.writeStatus = v1WriteStatusFunc(streams[errorChannel])
proxy.writeStatus = v1WriteStatusFunc(streams[remotecommand.StreamErr])
}

return proxy, nil
Expand Down
112 changes: 106 additions & 6 deletions lib/kube/proxy/testing/kube_server/kube_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"k8s.io/apimachinery/pkg/util/httpstream"
spdystream "k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
apiremotecommand "k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apiserver/pkg/endpoints/responsewriter"
"k8s.io/client-go/tools/remotecommand"

"github.com/gravitational/teleport/lib/defaults"
Expand Down Expand Up @@ -76,6 +78,12 @@ const (
// Value for streamType header for terminal resize stream
StreamTypeResize = "resize"

preV4BinaryWebsocketProtocol = wsstream.ChannelWebSocketProtocol
preV4Base64WebsocketProtocol = wsstream.Base64ChannelWebSocketProtocol
v4BinaryWebsocketProtocol = "v4." + wsstream.ChannelWebSocketProtocol
v4Base64WebsocketProtocol = "v4." + wsstream.Base64ChannelWebSocketProtocol
v5BinaryWebsocketProtocol = "v5." + wsstream.ChannelWebSocketProtocol

// CloseStreamMessage is an expected keyword if stdin is enable and the
// underlying protocol does not support half closed streams.
// It is only required for websockets.
Expand Down Expand Up @@ -335,12 +343,15 @@ func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, er
err error
)
if wsstream.IsWebSocketRequest(req.httpRequest) {
return nil, fmt.Errorf("only SPDY streams upgrades are supported")
}

proxy, err = createSPDYStreams(req)
if err != nil {
return nil, trace.Wrap(err)
proxy, err = createWebSocketStreams(req)
if err != nil {
return nil, trace.Wrap(err)
}
} else {
proxy, err = createSPDYStreams(req)
if err != nil {
return nil, trace.Wrap(err)
}
}

if proxy.resizeStream != nil {
Expand All @@ -350,6 +361,95 @@ func createRemoteCommandProxy(req remoteCommandRequest) (*remoteCommandProxy, er
return proxy, nil
}

func channelOrIgnore(channel wsstream.ChannelType, real bool) wsstream.ChannelType {
if real {
return channel
}
return wsstream.IgnoreChannel
}

func createWebSocketStreams(req remoteCommandRequest) (*remoteCommandProxy, error) {
channels := make([]wsstream.ChannelType, 5)
channels[apiremotecommand.StreamStdIn] = channelOrIgnore(wsstream.ReadChannel, req.stdin)
channels[apiremotecommand.StreamStdOut] = channelOrIgnore(wsstream.WriteChannel, req.stdout)
channels[apiremotecommand.StreamStdErr] = channelOrIgnore(wsstream.WriteChannel, req.stderr)
channels[apiremotecommand.StreamErr] = wsstream.WriteChannel
channels[apiremotecommand.StreamResize] = wsstream.ReadChannel

conn := wsstream.NewConn(map[string]wsstream.ChannelProtocolConfig{
"": {
Binary: true,
Channels: channels,
},
preV4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
preV4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
v4BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
v4Base64WebsocketProtocol: {
Binary: false,
Channels: channels,
},
v5BinaryWebsocketProtocol: {
Binary: true,
Channels: channels,
},
})
conn.SetIdleTimeout(IdleTimeout)
_, streams, err := conn.Open(
responsewriter.GetOriginal(req.httpResponseWriter),
req.httpRequest,
)
if err != nil {
return nil, trace.Wrap(err, "unable to upgrade websocket connection")
}

// Send an empty message to the lowest writable channel to notify the client the connection is established
switch {
case req.stdout:
streams[apiremotecommand.StreamStdOut].Write([]byte{})
case req.stderr:
streams[apiremotecommand.StreamStdErr].Write([]byte{})
default:
streams[apiremotecommand.StreamErr].Write([]byte{})
}

proxy := &remoteCommandProxy{
conn: conn,
stdinStream: streams[apiremotecommand.StreamStdIn],
stdoutStream: streams[apiremotecommand.StreamStdOut],
stderrStream: streams[apiremotecommand.StreamStdErr],
tty: req.tty,
resizeStream: streams[apiremotecommand.StreamResize],
}

// When stdin, stdout or stderr are not enabled, websocket creates a io.Pipe
// for them so they are not nil.
// Since we need to forward to another k8s server (Teleport or real k8s API),
// we must disabled the readers, otherwise the SPDY executor will wait for
// read/write into the streams and will hang.
if !req.stdin {
proxy.stdinStream = nil
}
if !req.stdout {
proxy.stdoutStream = nil
}
if !req.stderr {
proxy.stderrStream = nil
}

proxy.writeStatus = v4WriteStatusFunc(streams[apiremotecommand.StreamErr])

return proxy, nil
}

func createSPDYStreams(req remoteCommandRequest) (*remoteCommandProxy, error) {
protocol, err := httpstream.Handshake(req.httpRequest, req.httpResponseWriter, []string{StreamProtocolV4Name})
if err != nil {
Expand Down
Loading