diff --git a/lib/srv/exec.go b/lib/srv/exec.go index e766980a3c68b..b78d7f4c85a36 100644 --- a/lib/srv/exec.go +++ b/lib/srv/exec.go @@ -438,7 +438,7 @@ func emitExecAuditEvent(ctx *ServerContext, cmd string, execErr error) { scpEvent.Code = events.SCPDownloadCode } } - if err := ctx.session.recorder.EmitAuditEvent(ctx.srv.Context(), scpEvent); err != nil { + if err := ctx.session.emitAuditEvent(ctx.srv.Context(), scpEvent); err != nil { log.WithError(err).Warn("Failed to emit scp event.") } } else { @@ -458,7 +458,7 @@ func emitExecAuditEvent(ctx *ServerContext, cmd string, execErr error) { } else { execEvent.Code = events.ExecCode } - if err := ctx.session.recorder.EmitAuditEvent(ctx.srv.Context(), execEvent); err != nil { + if err := ctx.session.emitAuditEvent(ctx.srv.Context(), execEvent); err != nil { log.WithError(err).Warn("Failed to emit exec event.") } } diff --git a/lib/srv/sess.go b/lib/srv/sess.go index f7d8d8c048b52..0e5df8909c333 100644 --- a/lib/srv/sess.go +++ b/lib/srv/sess.go @@ -298,7 +298,7 @@ func (s *SessionRegistry) NotifyWinChange(ctx context.Context, params rsession.T // Report the updated window size to the event log (this is so the sessions // can be replayed correctly). - if err := session.recorder.EmitAuditEvent(s.Srv.Context(), resizeEvent); err != nil { + if err := session.emitAuditEvent(s.Srv.Context(), resizeEvent); err != nil { s.log.WithError(err).Warn("Failed to emit resize audit event.") } @@ -401,7 +401,8 @@ type session struct { // login stores the login of the initial session creator login string - recorder events.StreamWriter + recorderMu sync.RWMutex + recorder events.StreamWriter // hasEnhancedRecording returns true if this session has enhanced session // recording events associated. @@ -571,11 +572,17 @@ func (s *session) PID() int { // Recorder returns a events.SessionRecorder which can be used to emit events // to a session as well as the audit log. func (s *session) Recorder() events.StreamWriter { - s.mu.RLock() - defer s.mu.RUnlock() + s.recorderMu.RLock() + defer s.recorderMu.RUnlock() return s.recorder } +func (s *session) setRecorder(rec events.StreamWriter) { + s.recorderMu.Lock() + defer s.recorderMu.Unlock() + s.recorder = rec +} + // Stop ends the active session and forces all clients to disconnect. // This will trigger background goroutines to complete session cleanup. func (s *session) Stop() { @@ -692,7 +699,7 @@ func (s *session) emitSessionStartEvent(ctx *ServerContext) { sessionStartEvent.ConnectionMetadata.LocalAddr = ctx.ServerConn.LocalAddr().String() } - if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionStartEvent); err != nil { + if err := s.emitAuditEvent(ctx.srv.Context(), sessionStartEvent); err != nil { s.log.WithError(err).Warn("Failed to emit session start event.") } } @@ -722,7 +729,7 @@ func (s *session) emitSessionJoinEvent(ctx *ServerContext) { } // Emit session join event to Audit Log. - if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionJoinEvent); err != nil { + if err := s.emitAuditEvent(ctx.srv.Context(), sessionJoinEvent); err != nil { s.log.WithError(err).Warn("Failed to emit session join event.") } @@ -1064,6 +1071,32 @@ func newRecorder(s *session, ctx *ServerContext) (events.StreamWriter, error) { return rec, nil } +// newEventOnlyRecorder creates a StreamWriter that doesn't record session +// contents. It is used in scenarios where it is not possible to record those +// events. +func newEventOnlyRecorder(s *session, ctx *ServerContext) (events.StreamWriter, error) { + rec, err := events.NewAuditWriter(events.AuditWriterConfig{ + // Audit stream is using server context, not session context, + // to make sure that session is uploaded even after it is closed + Context: ctx.srv.Context(), + // It will use a TeeStreamer where the streamer is a discard, and the + // emitter is the auth server. The TeeStreamer is used to filter events + // that usually are not sent to auth server. + Streamer: events.NewTeeStreamer(events.NewDiscardEmitter(), ctx.srv), + SessionID: s.id, + Clock: s.registry.clock, + Namespace: ctx.srv.GetNamespace(), + ServerID: ctx.srv.HostUUID(), + RecordOutput: ctx.SessionRecordingConfig.GetMode() != types.RecordOff, + Component: teleport.Component(teleport.ComponentSession, ctx.srv.Component()), + ClusterName: ctx.ClusterName, + }) + if err != nil { + return nil, trace.Wrap(err) + } + return rec, nil +} + func (s *session) startExec(ctx context.Context, channel ssh.Channel, scx *ServerContext) error { // Emit a session.start event for the exec session. s.emitSessionStartEvent(scx) @@ -1706,3 +1739,20 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS return nil } + +// emitAuditEvent emits audit events. +func (s *session) emitAuditEvent(ctx context.Context, event apievents.AuditEvent) error { + rec := s.Recorder() + select { + case <-rec.Done(): + newRecorder, err := newEventOnlyRecorder(s, s.scx) + if err != nil { + return trace.ConnectionProblem(err, "failed to recreate audit events recorder") + } + s.setRecorder(newRecorder) + + return trace.Wrap(newRecorder.EmitAuditEvent(ctx, event)) + default: + return trace.Wrap(rec.EmitAuditEvent(ctx, event)) + } +}