Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert the desktop session player to the new player API #34070

Merged
merged 3 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -957,7 +958,9 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
if rmErr := os.Remove(tarballPath); rmErr != nil {
l.log.WithError(rmErr).Warningf("Failed to remove file %v.", tarballPath)
}

if errors.Is(err, fs.ErrNotExist) {
err = trace.NotFound("a recording for session %v was not found", sessionID)
}
e <- trace.Wrap(err)
return c, e
}
Expand All @@ -980,7 +983,7 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
for {
if ctx.Err() != nil {
e <- trace.Wrap(ctx.Err())
break
return
}

event, err := protoReader.Read(ctx)
Expand All @@ -990,12 +993,16 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
} else {
close(c)
}

break
return
}

if event.GetIndex() >= startIndex {
c <- event
select {
case c <- event:
case <-ctx.Done():
e <- trace.Wrap(ctx.Err())
return
}
}
}
}()
Expand Down
37 changes: 24 additions & 13 deletions lib/player/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"errors"
"math"
"os"
"sync/atomic"
"time"

Expand Down Expand Up @@ -64,10 +63,15 @@ type Player struct {
// and is inspired by "Rethinking Classical Concurrency Patterns"
// by Bryan C. Mills (GopherCon 2018): https://www.youtube.com/watch?v=5zXAHh5tJqQ
playPause chan chan struct{}

// err holds the error (if any) encountered during playback
err error
}

const normalPlayback = math.MinInt64

// Streamer is the underlying streamer that provides
// access to recorded session events.
type Streamer interface {
StreamSessionEvents(
ctx context.Context,
Expand Down Expand Up @@ -100,10 +104,7 @@ func New(cfg *Config) (*Player, error) {

var log logrus.FieldLogger = cfg.Log
if log == nil {
l := logrus.New().WithField(trace.Component, "player")
l.Logger.SetOutput(os.Stdout) // TODO(zmb3) remove
l.Logger.SetLevel(logrus.DebugLevel)
log = l
log = logrus.New().WithField(trace.Component, "player")
}

p := &Player{
Expand Down Expand Up @@ -137,6 +138,11 @@ const (
maxPlaybackSpeed = 16
)

// SetSpeed adjusts the playback speed of the player.
// It can be called at any time (the player can be in a playing
// or paused state). A speed of 1.0 plays back at regular speed,
// while a speed of 2.0 plays back twice as fast as originally
// recorded. Valid speeds range from 0.25 to 16.0.
func (p *Player) SetSpeed(s float64) error {
if s < minPlaybackSpeed || s > maxPlaybackSpeed {
return trace.BadParameter("speed %v is out of range", s)
Expand All @@ -146,22 +152,24 @@ func (p *Player) SetSpeed(s float64) error {
}

func (p *Player) stream() {
// TODO(zmb3): consider using context instead of close chan
eventsC, errC := p.streamer.StreamSessionEvents(context.TODO(), p.sessionID, 0)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventsC, errC := p.streamer.StreamSessionEvents(ctx, p.sessionID, 0)
lastDelay := int64(0)
for {
select {
case <-p.done:
close(p.emit)
return
case err := <-errC:
// TODO(zmb3): figure out how to surface the error
// (probably close the chan and expose a method)
p.log.Warn(err)
p.err = err
close(p.emit)
return
case evt := <-eventsC:
if evt == nil {
p.log.Debug("reached end of playback")
p.log.Debugf("reached end of playback for session %v", p.sessionID)
close(p.emit)
return
}
Expand Down Expand Up @@ -203,7 +211,6 @@ func (p *Player) stream() {
lastDelay = currentDelay
}

p.log.Debugf("playing %v (%v)", evt.GetType(), evt.GetID())
select {
case p.emit <- evt:
p.lastPlayed.Store(currentDelay)
Expand All @@ -229,7 +236,12 @@ func (p *Player) C() <-chan events.AuditEvent {
return p.emit
}

// TODO(zmb3): add an Err() method to be checked after C is closed
// Err returns the error (if any) that occurred during playback.
// It should only be called after the channel returned by [C] is
// closed.
func (p *Player) Err() error {
return p.err
}

// Pause temporarily stops the player from emitting events.
// It is a no-op if playback is currently paused.
Expand Down Expand Up @@ -262,7 +274,6 @@ func (p *Player) SetPos(d time.Duration) error {
// applyDelay "sleeps" for d in a manner that
// can be canceled
func (p *Player) applyDelay(d time.Duration) error {
p.log.Debugf("waiting %v until next event", d)
scaled := float64(d) / p.speed.Load().(float64)
select {
case <-p.done:
Expand Down
2 changes: 2 additions & 0 deletions lib/player/player_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestBasicStream(t *testing.T) {
}

require.Equal(t, 3, count)
require.NoError(t, p.Err())
}

func TestPlayPause(t *testing.T) {
Expand Down Expand Up @@ -163,6 +164,7 @@ func TestClose(t *testing.T) {
// channel should have been closed
_, ok := <-p.C()
require.False(t, ok, "player channel should have been closed")
require.NoError(t, p.Err())
}

func TestSeekForward(t *testing.T) {
Expand Down
Loading
Loading