Skip to content

Commit

Permalink
backport part of #12916 to get tests to pass
Browse files Browse the repository at this point in the history
  • Loading branch information
rosstimothy committed Dec 20, 2022
1 parent d0e21bd commit 12a9755
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 8 deletions.
4 changes: 2 additions & 2 deletions lib/srv/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func emitExecAuditEvent(ctx *ServerContext, cmd string, execErr error) {
scpEvent.Code = events.SCPDownloadCode
}
}
if err := ctx.session.recorder.EmitAuditEvent(ctx.srv.Context(), scpEvent); err != nil {
if err := ctx.session.emitAuditEvent(ctx.srv.Context(), scpEvent); err != nil {
log.WithError(err).Warn("Failed to emit scp event.")
}
} else {
Expand All @@ -458,7 +458,7 @@ func emitExecAuditEvent(ctx *ServerContext, cmd string, execErr error) {
} else {
execEvent.Code = events.ExecCode
}
if err := ctx.session.recorder.EmitAuditEvent(ctx.srv.Context(), execEvent); err != nil {
if err := ctx.session.emitAuditEvent(ctx.srv.Context(), execEvent); err != nil {
log.WithError(err).Warn("Failed to emit exec event.")
}
}
Expand Down
62 changes: 56 additions & 6 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *SessionRegistry) NotifyWinChange(ctx context.Context, params rsession.T

// Report the updated window size to the event log (this is so the sessions
// can be replayed correctly).
if err := session.recorder.EmitAuditEvent(s.Srv.Context(), resizeEvent); err != nil {
if err := session.emitAuditEvent(s.Srv.Context(), resizeEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit resize audit event.")
}

Expand Down Expand Up @@ -401,7 +401,8 @@ type session struct {
// login stores the login of the initial session creator
login string

recorder events.StreamWriter
recorderMu sync.RWMutex
recorder events.StreamWriter

// hasEnhancedRecording returns true if this session has enhanced session
// recording events associated.
Expand Down Expand Up @@ -571,11 +572,17 @@ func (s *session) PID() int {
// Recorder returns a events.SessionRecorder which can be used to emit events
// to a session as well as the audit log.
func (s *session) Recorder() events.StreamWriter {
s.mu.RLock()
defer s.mu.RUnlock()
s.recorderMu.RLock()
defer s.recorderMu.RUnlock()
return s.recorder
}

func (s *session) setRecorder(rec events.StreamWriter) {
s.recorderMu.Lock()
defer s.recorderMu.Unlock()
s.recorder = rec
}

// Stop ends the active session and forces all clients to disconnect.
// This will trigger background goroutines to complete session cleanup.
func (s *session) Stop() {
Expand Down Expand Up @@ -692,7 +699,7 @@ func (s *session) emitSessionStartEvent(ctx *ServerContext) {
sessionStartEvent.ConnectionMetadata.LocalAddr = ctx.ServerConn.LocalAddr().String()
}

if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionStartEvent); err != nil {
if err := s.emitAuditEvent(ctx.srv.Context(), sessionStartEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit session start event.")
}
}
Expand Down Expand Up @@ -722,7 +729,7 @@ func (s *session) emitSessionJoinEvent(ctx *ServerContext) {
}

// Emit session join event to Audit Log.
if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionJoinEvent); err != nil {
if err := s.emitAuditEvent(ctx.srv.Context(), sessionJoinEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit session join event.")
}

Expand Down Expand Up @@ -1064,6 +1071,32 @@ func newRecorder(s *session, ctx *ServerContext) (events.StreamWriter, error) {
return rec, nil
}

// newEventOnlyRecorder creates a StreamWriter that doesn't record session
// contents. It is used in scenarios where it is not possible to record those
// events.
func newEventOnlyRecorder(s *session, ctx *ServerContext) (events.StreamWriter, error) {
rec, err := events.NewAuditWriter(events.AuditWriterConfig{
// Audit stream is using server context, not session context,
// to make sure that session is uploaded even after it is closed
Context: ctx.srv.Context(),
// It will use a TeeStreamer where the streamer is a discard, and the
// emitter is the auth server. The TeeStreamer is used to filter events
// that usually are not sent to auth server.
Streamer: events.NewTeeStreamer(events.NewDiscardEmitter(), ctx.srv),
SessionID: s.id,
Clock: s.registry.clock,
Namespace: ctx.srv.GetNamespace(),
ServerID: ctx.srv.HostUUID(),
RecordOutput: ctx.SessionRecordingConfig.GetMode() != types.RecordOff,
Component: teleport.Component(teleport.ComponentSession, ctx.srv.Component()),
ClusterName: ctx.ClusterName,
})
if err != nil {
return nil, trace.Wrap(err)
}
return rec, nil
}

func (s *session) startExec(ctx context.Context, channel ssh.Channel, scx *ServerContext) error {
// Emit a session.start event for the exec session.
s.emitSessionStartEvent(scx)
Expand Down Expand Up @@ -1706,3 +1739,20 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS

return nil
}

// emitAuditEvent emits audit events.
func (s *session) emitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
rec := s.Recorder()
select {
case <-rec.Done():
newRecorder, err := newEventOnlyRecorder(s, s.scx)
if err != nil {
return trace.ConnectionProblem(err, "failed to recreate audit events recorder")
}
s.setRecorder(newRecorder)

return trace.Wrap(newRecorder.EmitAuditEvent(ctx, event))
default:
return trace.Wrap(rec.EmitAuditEvent(ctx, event))
}
}

0 comments on commit 12a9755

Please sign in to comment.