Skip to content

Commit

Permalink
Convert the desktop sesssion player to the new player API (#34070)
Browse files Browse the repository at this point in the history
This makes a few changes to the player API to ensure that errors
are correctly propagated.
  • Loading branch information
zmb3 authored and Alex McGrath committed Feb 13, 2024
1 parent 61bb058 commit 66c8038
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 231 deletions.
17 changes: 12 additions & 5 deletions lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -985,7 +986,9 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
if rmErr := os.Remove(tarballPath); rmErr != nil {
l.log.WithError(rmErr).Warningf("Failed to remove file %v.", tarballPath)
}

if errors.Is(err, fs.ErrNotExist) {
err = trace.NotFound("a recording for session %v was not found", sessionID)
}
e <- trace.Wrap(err)
return c, e
}
Expand All @@ -1003,7 +1006,7 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
for {
if ctx.Err() != nil {
e <- trace.Wrap(ctx.Err())
break
return
}

event, err := protoReader.Read(ctx)
Expand All @@ -1013,12 +1016,16 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
} else {
close(c)
}

break
return
}

if event.GetIndex() >= startIndex {
c <- event
select {
case c <- event:
case <-ctx.Done():
e <- trace.Wrap(ctx.Err())
return
}
}
}
}()
Expand Down
37 changes: 24 additions & 13 deletions lib/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"errors"
"math"
"os"
"sync/atomic"
"time"

Expand Down Expand Up @@ -64,10 +63,15 @@ type Player struct {
// and is inspired by "Rethinking Classical Concurrency Patterns"
// by Bryan C. Mills (GopherCon 2018): https://www.youtube.com/watch?v=5zXAHh5tJqQ
playPause chan chan struct{}

// err holds the error (if any) encountered during playback
err error
}

const normalPlayback = math.MinInt64

// Streamer is the underlying streamer that provides
// access to recorded session events.
type Streamer interface {
StreamSessionEvents(
ctx context.Context,
Expand Down Expand Up @@ -100,10 +104,7 @@ func New(cfg *Config) (*Player, error) {

var log logrus.FieldLogger = cfg.Log
if log == nil {
l := logrus.New().WithField(trace.Component, "player")
l.Logger.SetOutput(os.Stdout) // TODO(zmb3) remove
l.Logger.SetLevel(logrus.DebugLevel)
log = l
log = logrus.New().WithField(trace.Component, "player")
}

p := &Player{
Expand Down Expand Up @@ -137,6 +138,11 @@ const (
maxPlaybackSpeed = 16
)

// SetSpeed adjusts the playback speed of the player.
// It can be called at any time (the player can be in a playing
// or paused state). A speed of 1.0 plays back at regular speed,
// while a speed of 2.0 plays back twice as fast as originally
// recorded. Valid speeds range from 0.25 to 16.0.
func (p *Player) SetSpeed(s float64) error {
if s < minPlaybackSpeed || s > maxPlaybackSpeed {
return trace.BadParameter("speed %v is out of range", s)
Expand All @@ -146,22 +152,24 @@ func (p *Player) SetSpeed(s float64) error {
}

func (p *Player) stream() {
// TODO(zmb3): consider using context instead of close chan
eventsC, errC := p.streamer.StreamSessionEvents(context.TODO(), p.sessionID, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventsC, errC := p.streamer.StreamSessionEvents(ctx, p.sessionID, 0)
lastDelay := int64(0)
for {
select {
case <-p.done:
close(p.emit)
return
case err := <-errC:
// TODO(zmb3): figure out how to surface the error
// (probably close the chan and expose a method)
p.log.Warn(err)
p.err = err
close(p.emit)
return
case evt := <-eventsC:
if evt == nil {
p.log.Debug("reached end of playback")
p.log.Debugf("reached end of playback for session %v", p.sessionID)
close(p.emit)
return
}
Expand Down Expand Up @@ -203,7 +211,6 @@ func (p *Player) stream() {
lastDelay = currentDelay
}

p.log.Debugf("playing %v (%v)", evt.GetType(), evt.GetID())
select {
case p.emit <- evt:
p.lastPlayed.Store(currentDelay)
Expand All @@ -229,7 +236,12 @@ func (p *Player) C() <-chan events.AuditEvent {
return p.emit
}

// TODO(zmb3): add an Err() method to be checked after C is closed
// Err returns the error (if any) that occurred during playback.
// It should only be called after the channel returned by [C] is
// closed.
func (p *Player) Err() error {
return p.err
}

// Pause temporarily stops the player from emitting events.
// It is a no-op if playback is currently paused.
Expand Down Expand Up @@ -262,7 +274,6 @@ func (p *Player) SetPos(d time.Duration) error {
// applyDelay "sleeps" for d in a manner that
// can be canceled
func (p *Player) applyDelay(d time.Duration) error {
p.log.Debugf("waiting %v until next event", d)
scaled := float64(d) / p.speed.Load().(float64)
select {
case <-p.done:
Expand Down
2 changes: 2 additions & 0 deletions lib/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestBasicStream(t *testing.T) {
}

require.Equal(t, 3, count)
require.NoError(t, p.Err())
}

func TestPlayPause(t *testing.T) {
Expand Down Expand Up @@ -163,6 +164,7 @@ func TestClose(t *testing.T) {
// channel should have been closed
_, ok := <-p.C()
require.False(t, ok, "player channel should have been closed")
require.NoError(t, p.Err())
}

func TestSeekForward(t *testing.T) {
Expand Down
Loading

0 comments on commit 66c8038

Please sign in to comment.