Skip to content

Commit

Permalink
Update the web UI to stream session playback (#36168)
Browse files Browse the repository at this point in the history
Prior to this, the web UI would download the entire session recording
and store it in JavaScript memory before starting playback. This caused
the browser tab to crash when attempting to play back sessions larger
than ~5MB.

For playback, we use a custom binary protocol rather than the protobuf
envelopes that we use for live sessions. The protobuf envelopes only
send raw PTY data, there is no place to put the timing data. Adding
fields to the envelope would be a disruptive change because our JS
codec is hand-rolled and we'd have to make the parsing updates manually.

Updates gravitational/teleport-private#1024
Closes gravitational/teleport-private#665
Closes #10578
  • Loading branch information
zmb3 authored Jan 6, 2024
1 parent 0161397 commit 6dad93c
Show file tree
Hide file tree
Showing 25 changed files with 817 additions and 1,968 deletions.
4 changes: 2 additions & 2 deletions lib/auth/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ func NewAPIServer(config *APIConfig) (http.Handler, error) {
srv.POST("/:version/tokens/register", srv.WithAuth(srv.registerUsingToken))

// Active sessions
srv.GET("/:version/namespaces/:namespace/sessions/:id/stream", srv.WithAuth(srv.getSessionChunk))
srv.GET("/:version/namespaces/:namespace/sessions/:id/events", srv.WithAuth(srv.getSessionEvents))
srv.GET("/:version/namespaces/:namespace/sessions/:id/stream", srv.WithAuth(srv.getSessionChunk)) // DELETE IN 16(zmb3)
srv.GET("/:version/namespaces/:namespace/sessions/:id/events", srv.WithAuth(srv.getSessionEvents)) // DELETE IN 16(zmb3)

// Namespaces
srv.POST("/:version/namespaces", srv.WithAuth(srv.upsertNamespace))
Expand Down
17 changes: 9 additions & 8 deletions lib/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func New(cfg *Config) (*Player, error) {
log: log,
sessionID: cfg.SessionID,
streamer: cfg.Streamer,
emit: make(chan events.AuditEvent, 64),
emit: make(chan events.AuditEvent, 1024),
playPause: make(chan chan struct{}, 1),
done: make(chan struct{}),
}
Expand Down Expand Up @@ -185,7 +185,7 @@ func (p *Player) stream() {
}

currentDelay := getDelay(evt)
if currentDelay > 0 && currentDelay > lastDelay {
if currentDelay > 0 && currentDelay >= lastDelay {
switch adv := p.advanceTo.Load(); {
case adv >= currentDelay:
// no timing delay necessary, we are fast forwarding
Expand Down Expand Up @@ -215,12 +215,13 @@ func (p *Player) stream() {
lastDelay = currentDelay
}

select {
case p.emit <- evt:
p.lastPlayed.Store(currentDelay)
default:
p.log.Warnf("dropped event %v, reader too slow", evt.GetID())
}
// if the receiver can't keep up, let the channel throttle us
// (it's better for playback to be a little slower than realtime
// than to drop events)
//
// TODO: consider a select with a timeout to detect blocked readers?
p.emit <- evt
p.lastPlayed.Store(currentDelay)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions lib/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func TestClose(t *testing.T) {
_, ok := <-p.C()
require.False(t, ok, "player channel should have been closed")
require.NoError(t, p.Err())
require.Equal(t, int64(1000), p.LastPlayed())
}

func TestSeekForward(t *testing.T) {
Expand Down
9 changes: 7 additions & 2 deletions lib/web/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,11 @@ func (h *Handler) bindDefaultEndpoints() {
// Audit events handlers.
h.GET("/webapi/sites/:site/events/search", h.WithClusterAuth(h.clusterSearchEvents)) // search site events
h.GET("/webapi/sites/:site/events/search/sessions", h.WithClusterAuth(h.clusterSearchSessionEvents)) // search site session events
h.GET("/webapi/sites/:site/sessions/:sid/events", h.WithClusterAuth(h.siteSessionEventsGet)) // get recorded session's timing information (from events)
h.GET("/webapi/sites/:site/sessions/:sid/stream", h.siteSessionStreamGet) // get recorded session's bytes (from events)
h.GET("/webapi/sites/:site/ttyplayback/:sid", h.WithClusterAuth(h.ttyPlaybackHandle))

// DELETE in 16(zmb3): v15+ web UIs use new streaming 'ttyplayback' endpoint
h.GET("/webapi/sites/:site/sessions/:sid/events", h.WithClusterAuth(h.siteSessionEventsGet)) // get recorded session's timing information (from events)
h.GET("/webapi/sites/:site/sessions/:sid/stream", h.siteSessionStreamGet) // get recorded session's bytes (from events)

// scp file transfer
h.GET("/webapi/sites/:site/nodes/:server/:login/scp", h.WithClusterAuth(h.transferFile))
Expand Down Expand Up @@ -3417,6 +3420,8 @@ func queryOrder(query url.Values, name string, def types.EventOrder) (types.Even
// It returns the binary stream unencoded, directly in the respose body,
// with Content-Type of application/octet-stream, gzipped with up to 95%
// compression ratio.
//
// DELETE IN 16(zmb3)
func (h *Handler) siteSessionStreamGet(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
httplib.SetNoCacheHeaders(w.Header())

Expand Down
Loading

0 comments on commit 6dad93c

Please sign in to comment.