Skip to content

Commit

Permalink
[v9] Prevent "session.start" from being overwritten by "session.exec" (
Browse files Browse the repository at this point in the history
…#19499)

The `session.exec` event was not being passed through the session
recorder, which resulted in said event having an event index of 0.
This caused the original `session.start` event which also has an
`eid` of 0 to be overwritten by the `session.exec` event.

By emitting the `session.exec` event via the same mechanism as the
`session.start` event it gets a proper event index and no longer
overwrites the `session.start`.

Closes #13622
  • Loading branch information
rosstimothy authored Dec 21, 2022
1 parent b7c1eff commit dda4160
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 38 deletions.
4 changes: 4 additions & 0 deletions lib/srv/ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ type IdentityContext struct {
// and other resources. SessionContext also holds a ServerContext which can be
// used to access resources on the underlying server. SessionContext can also
// be used to attach resources that should be closed once the session closes.
//
// Any events that need to be recorded should be emitted via session and not
// ServerContext directly. Failure to use the session emitted will result in
// incorrect event indexes that may ultimately cause events to be overwritten.
type ServerContext struct {
// ConnectionContext is the parent context which manages connection-level
// resources.
Expand Down
13 changes: 10 additions & 3 deletions lib/srv/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,10 +361,16 @@ func (e *remoteExec) PID() int {
return 0
}

// emitExecAuditEvent emits either an SCP or exec event based on the
// command run.
//
// Note: to ensure that the event is recorded ctx.session must be used
// instead of ctx.srv.
func emitExecAuditEvent(ctx *ServerContext, cmd string, execErr error) {
// Create common fields for event.
serverMeta := apievents.ServerMetadata{
ServerID: ctx.srv.HostUUID(),
ServerHostname: ctx.srv.GetInfo().GetHostname(),
ServerNamespace: ctx.srv.GetNamespace(),
}

Expand Down Expand Up @@ -432,13 +438,14 @@ func emitExecAuditEvent(ctx *ServerContext, cmd string, execErr error) {
scpEvent.Code = events.SCPDownloadCode
}
}
if err := ctx.srv.EmitAuditEvent(ctx.srv.Context(), scpEvent); err != nil {
if err := ctx.session.emitAuditEvent(ctx.srv.Context(), scpEvent); err != nil {
log.WithError(err).Warn("Failed to emit scp event.")
}
} else {
execEvent := &apievents.Exec{
Metadata: apievents.Metadata{
Type: events.ExecEvent,
Type: events.ExecEvent,
ClusterName: ctx.ClusterName,
},
ServerMetadata: serverMeta,
SessionMetadata: sessionMeta,
Expand All @@ -451,7 +458,7 @@ func emitExecAuditEvent(ctx *ServerContext, cmd string, execErr error) {
} else {
execEvent.Code = events.ExecCode
}
if err := ctx.srv.EmitAuditEvent(ctx.srv.Context(), execEvent); err != nil {
if err := ctx.session.emitAuditEvent(ctx.srv.Context(), execEvent); err != nil {
log.WithError(err).Warn("Failed to emit exec event.")
}
}
Expand Down
53 changes: 46 additions & 7 deletions lib/srv/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/sshutils"
"github.com/gravitational/teleport/lib/utils"

"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"
)

// TestMain will re-execute Teleport to run a command if "exec" is passed to
Expand Down Expand Up @@ -82,7 +84,7 @@ func TestOSCommandPrep(t *testing.T) {
fmt.Sprintf("SSH_TTY=%v", scx.session.term.TTY().Name()),
"SSH_SESSION_ID=xxx",
"SSH_SESSION_WEBPROXY_ADDR=<proxyhost>:3080",
"SSH_TELEPORT_HOST_UUID=test",
"SSH_TELEPORT_HOST_UUID=testID",
"SSH_TELEPORT_CLUSTER_NAME=localhost",
"SSH_TELEPORT_USER=teleportUser",
}
Expand Down Expand Up @@ -150,6 +152,27 @@ func TestEmitExecAuditEvent(t *testing.T) {
srv := newMockServer(t)
scx := newExecServerContext(t, srv)

rec, ok := scx.session.recorder.(*mockRecorder)
require.True(t, ok)

expectedUsr, err := user.Current()
require.NoError(t, err)
expectedHostname, err := os.Hostname()
if err != nil {
expectedHostname = "localhost"
}

expectedMeta := apievents.UserMetadata{
User: "teleportUser",
Login: expectedUsr.Username,
Impersonator: "",
AWSRoleARN: "",
AccessRequests: []string(nil),
XXX_NoUnkeyedLiteral: struct{}{},
XXX_unrecognized: []uint8(nil),
XXX_sizecache: 0,
}

var tests = []struct {
inCommand string
inError error
Expand Down Expand Up @@ -180,9 +203,19 @@ func TestEmitExecAuditEvent(t *testing.T) {
}
for _, tt := range tests {
emitExecAuditEvent(scx, tt.inCommand, tt.inError)
execEvent := srv.MockEmitter.LastEvent().(*apievents.Exec)

evt := <-rec.emitter.C()
execEvent := evt.(*apievents.Exec)
require.Equal(t, tt.outCommand, execEvent.Command)
require.Equal(t, tt.outCode, execEvent.ExitCode)
require.Equal(t, expectedMeta, execEvent.UserMetadata)
require.Equal(t, "testHostUUID", execEvent.ServerID)
require.Equal(t, expectedHostname, execEvent.ServerHostname)
require.Equal(t, "testNamespace", execEvent.ServerNamespace)
require.Equal(t, "xxx", execEvent.SessionID)
require.Equal(t, "10.0.0.5:4817", execEvent.RemoteAddr)
require.Equal(t, "127.0.0.1:3022", execEvent.LocalAddr)
require.NotZero(t, events.EventID)
}
}

Expand Down Expand Up @@ -241,8 +274,14 @@ func newExecServerContext(t *testing.T, srv Server) *ServerContext {
require.NoError(t, err)
term.SetTermType("xterm")

scx.session = &session{id: "xxx"}
scx.session.term = term
scx.session = &session{
id: "xxx",
term: term,
recorder: &mockRecorder{
done: false,
emitter: eventstest.NewChannelEmitter(10),
},
}
err = scx.SetSSHRequest(&ssh.Request{Type: sshutils.ExecRequest})
require.NoError(t, err)

Expand Down
8 changes: 4 additions & 4 deletions lib/srv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,23 @@ type mockServer struct {

// ID is the unique ID of the server.
func (m *mockServer) ID() string {
return "test"
return "testID"
}

// HostUUID is the UUID of the underlying host. For the forwarding
// server this is the proxy the forwarding server is running in.
func (m *mockServer) HostUUID() string {
return "test"
return "testHostUUID"
}

// GetNamespace returns the namespace the server was created in.
func (m *mockServer) GetNamespace() string {
return "test"
return "testNamespace"
}

// AdvertiseAddr is the publicly addressable address of this server.
func (m *mockServer) AdvertiseAddr() string {
return "test"
return "testNamespace"
}

// Component is the type of server, forwarding or regular.
Expand Down
66 changes: 58 additions & 8 deletions lib/srv/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func (s *SessionRegistry) NotifyWinChange(ctx context.Context, params rsession.T

// Report the updated window size to the event log (this is so the sessions
// can be replayed correctly).
if err := session.recorder.EmitAuditEvent(s.Srv.Context(), resizeEvent); err != nil {
if err := session.emitAuditEvent(s.Srv.Context(), resizeEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit resize audit event.")
}

Expand Down Expand Up @@ -401,7 +401,8 @@ type session struct {
// login stores the login of the initial session creator
login string

recorder events.StreamWriter
recorderMu sync.RWMutex
recorder events.StreamWriter

// hasEnhancedRecording returns true if this session has enhanced session
// recording events associated.
Expand Down Expand Up @@ -571,11 +572,17 @@ func (s *session) PID() int {
// Recorder returns a events.SessionRecorder which can be used to emit events
// to a session as well as the audit log.
func (s *session) Recorder() events.StreamWriter {
s.mu.RLock()
defer s.mu.RUnlock()
s.recorderMu.RLock()
defer s.recorderMu.RUnlock()
return s.recorder
}

func (s *session) setRecorder(rec events.StreamWriter) {
s.recorderMu.Lock()
defer s.recorderMu.Unlock()
s.recorder = rec
}

// Stop ends the active session and forces all clients to disconnect.
// This will trigger background goroutines to complete session cleanup.
func (s *session) Stop() {
Expand Down Expand Up @@ -692,7 +699,7 @@ func (s *session) emitSessionStartEvent(ctx *ServerContext) {
sessionStartEvent.ConnectionMetadata.LocalAddr = ctx.ServerConn.LocalAddr().String()
}

if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionStartEvent); err != nil {
if err := s.emitAuditEvent(ctx.srv.Context(), sessionStartEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit session start event.")
}
}
Expand Down Expand Up @@ -722,7 +729,7 @@ func (s *session) emitSessionJoinEvent(ctx *ServerContext) {
}

// Emit session join event to Audit Log.
if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionJoinEvent); err != nil {
if err := s.emitAuditEvent(ctx.srv.Context(), sessionJoinEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit session join event.")
}

Expand Down Expand Up @@ -761,7 +768,7 @@ func (s *session) emitSessionLeaveEvent(ctx *ServerContext) {
}

// Emit session leave event to Audit Log.
if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionLeaveEvent); err != nil {
if err := s.emitAuditEvent(ctx.srv.Context(), sessionLeaveEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit session leave event.")
}

Expand Down Expand Up @@ -828,7 +835,7 @@ func (s *session) emitSessionEndEvent() {
sessionEndEvent.Participants = []string{s.scx.Identity.TeleportUser}
}

if err := s.recorder.EmitAuditEvent(ctx.srv.Context(), sessionEndEvent); err != nil {
if err := s.emitAuditEvent(ctx.srv.Context(), sessionEndEvent); err != nil {
s.log.WithError(err).Warn("Failed to emit session end event.")
}
}
Expand Down Expand Up @@ -1064,6 +1071,32 @@ func newRecorder(s *session, ctx *ServerContext) (events.StreamWriter, error) {
return rec, nil
}

// newEventOnlyRecorder creates a StreamWriter that doesn't record session
// contents. It is used in scenarios where it is not possible to record those
// events.
func newEventOnlyRecorder(s *session, ctx *ServerContext) (events.StreamWriter, error) {
rec, err := events.NewAuditWriter(events.AuditWriterConfig{
// Audit stream is using server context, not session context,
// to make sure that session is uploaded even after it is closed
Context: ctx.srv.Context(),
// It will use a TeeStreamer where the streamer is a discard, and the
// emitter is the auth server. The TeeStreamer is used to filter events
// that usually are not sent to auth server.
Streamer: events.NewTeeStreamer(events.NewDiscardEmitter(), ctx.srv),
SessionID: s.id,
Clock: s.registry.clock,
Namespace: ctx.srv.GetNamespace(),
ServerID: ctx.srv.HostUUID(),
RecordOutput: ctx.SessionRecordingConfig.GetMode() != types.RecordOff,
Component: teleport.Component(teleport.ComponentSession, ctx.srv.Component()),
ClusterName: ctx.ClusterName,
})
if err != nil {
return nil, trace.Wrap(err)
}
return rec, nil
}

func (s *session) startExec(ctx context.Context, channel ssh.Channel, scx *ServerContext) error {
// Emit a session.start event for the exec session.
s.emitSessionStartEvent(scx)
Expand Down Expand Up @@ -1706,3 +1739,20 @@ func (s *session) trackSession(ctx context.Context, teleportUser string, policyS

return nil
}

// emitAuditEvent emits audit events.
func (s *session) emitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
rec := s.Recorder()
select {
case <-rec.Done():
newRecorder, err := newEventOnlyRecorder(s, s.scx)
if err != nil {
return trace.ConnectionProblem(err, "failed to recreate audit events recorder")
}
s.setRecorder(newRecorder)

return trace.Wrap(newRecorder.EmitAuditEvent(ctx, event))
default:
return trace.Wrap(rec.EmitAuditEvent(ctx, event))
}
}
40 changes: 24 additions & 16 deletions lib/srv/sess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ import (
"testing"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/types"
apievents "github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/defaults"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/eventstest"
"github.com/gravitational/teleport/lib/modules"
"github.com/gravitational/teleport/lib/services"
rsession "github.com/gravitational/teleport/lib/session"
"github.com/gravitational/teleport/lib/utils"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
)

func TestParseAccessRequestIDs(t *testing.T) {
Expand Down Expand Up @@ -246,17 +246,6 @@ func TestInteractiveSession(t *testing.T) {
}
require.Eventually(t, sessionClosed, time.Second*15, time.Millisecond*500)
})

t.Run("BrokenRecorder", func(t *testing.T) {
t.Parallel()
sess, _ := testOpenSession(t, reg, nil)

// The recorder might be closed in the case of an error downstream.
// Closing the session recorder should result in the session ending.
err := sess.recorder.Close(context.Background())
require.NoError(t, err)
require.Eventually(t, sess.isStopped, time.Second*5, time.Millisecond*500)
})
}

// TestStopUnstarted tests that a session may be stopped before it launches.
Expand Down Expand Up @@ -405,6 +394,25 @@ func testOpenSession(t *testing.T, reg *SessionRegistry, roleSet services.RoleSe
return scx.session, sshChanOpen
}

type mockRecorder struct {
events.StreamWriter
emitter *eventstest.ChannelEmitter
done bool
}

func (m *mockRecorder) Done() <-chan struct{} {
ch := make(chan struct{})
if m.done {
close(ch)
}

return ch
}

func (m *mockRecorder) EmitAuditEvent(ctx context.Context, event apievents.AuditEvent) error {
return m.emitter.EmitAuditEvent(ctx, event)
}

type trackerService struct {
created int32
createError error
Expand Down

0 comments on commit dda4160

Please sign in to comment.