Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SSH Session recording modes #12916

Merged
merged 27 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
81c00c0
feat: session recording modes
gabrielcorado Apr 28, 2022
127a522
fix(services): remove undefined constant
gabrielcorado May 26, 2022
2bf5753
refactor(types): change attribute number
gabrielcorado May 26, 2022
f04ad76
chore: fix typos
gabrielcorado May 26, 2022
80f10ff
refactor(events): change `receiveAndUpload` to return error
gabrielcorado May 30, 2022
0e8fabe
refactor(srv): change complete session recording call
gabrielcorado May 30, 2022
94b5195
chore(constants): remove unused constant
gabrielcorado May 30, 2022
5098f6b
feat(srv): wrap errors
gabrielcorado May 30, 2022
ee31f65
chore: fix lint and tests
gabrielcorado May 30, 2022
78a4669
chore(srv): add missing parameter
gabrielcorado May 31, 2022
986e4ba
refactor(filesessions): remove random data generation
gabrielcorado May 31, 2022
3eac532
refactor(events): change last number part increment
gabrielcorado May 31, 2022
83ad1af
chore(filesessions): fix license
gabrielcorado May 31, 2022
e267454
fix(events): increment the last part after upload is complete
gabrielcorado May 31, 2022
9c26d2d
refactor(events): change `cancel` call on auditwriter
gabrielcorado May 31, 2022
bbeed58
fix(srv): use proper identity when creating session recorder
gabrielcorado Jun 1, 2022
cddc48c
Revert "refactor(events): change last number part increment"
gabrielcorado Jun 2, 2022
617530c
tests(integration): add session recording mode
gabrielcorado Jun 3, 2022
6745fa0
chore: apply review suggestions
gabrielcorado Jun 3, 2022
2632353
refactor(filesessions): generate buffer after file is opened
gabrielcorado Jun 3, 2022
bfbd7e1
Merge branch 'master' into gabrielcorado/session-recording-modes
gabrielcorado Jun 3, 2022
7d97b18
Merge branch 'master' into gabrielcorado/session-recording-modes
gabrielcorado Jun 6, 2022
c289e27
chore(events): fix lint
gabrielcorado Jun 6, 2022
38b8554
chore(events): fix lint
gabrielcorado Jun 6, 2022
3f275af
Merge branch 'master' into gabrielcorado/session-recording-modes
gabrielcorado Jun 6, 2022
45cecc0
Merge branch 'master' into gabrielcorado/session-recording-modes
gabrielcorado Jun 6, 2022
2f13359
Merge branch 'master' into gabrielcorado/session-recording-modes
gabrielcorado Jun 6, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions api/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,25 @@ const (
// when HTTPS_PROXY or HTTP_PROXY is ignored
NoProxy = "NO_PROXY"
)

// SessionRecordingService is used to differentiate session recording services.
type SessionRecordingService int

const (
// SessionRecordingServiceSSH represents the SSH service session.
SessionRecordingServiceSSH SessionRecordingService = iota
)

// SessionRecordingMode determines how session recording will behave in failure
// scenarios.
type SessionRecordingMode string

const (
// SessionRecordingModeStrict causes any failure session recording to
// terminate the session or prevent a new session from starting.
SessionRecordingModeStrict = SessionRecordingMode("strict")

// SessionRecordingModeBestEffort allows the session to keep going even when
// session recording fails.
SessionRecordingModeBestEffort = SessionRecordingMode("best_effort")
)
1 change: 1 addition & 0 deletions api/types/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ func (r *RoleV5) CheckAndSetDefaults() error {
if r.Spec.Options.RecordSession == nil {
r.Spec.Options.RecordSession = &RecordSession{
Desktop: NewBoolOption(true),
Default: constants.SessionRecordingModeBestEffort,
}
}
if r.Spec.Options.DesktopClipboard == nil {
Expand Down
1,752 changes: 923 additions & 829 deletions api/types/types.pb.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions api/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,20 @@ message RecordSession {
(gogoproto.jsontag) = "desktop",
(gogoproto.customtype) = "BoolOption"
];

// Default indicates the default value for the services.
string Default = 2 [
(gogoproto.jsontag) = "default,omitempty",
(gogoproto.casttype) =
"github.com/gravitational/teleport/api/constants.SessionRecordingMode"
];

// SSH indicates the session mode used on SSH sessions.
string SSH = 3 [
(gogoproto.jsontag) = "ssh,omitempty",
(gogoproto.casttype) =
"github.com/gravitational/teleport/api/constants.SessionRecordingMode"
];
}

// CertExtensionMode specifies the type of extension to use in the cert.
Expand Down
162 changes: 162 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"golang.org/x/crypto/ssh"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/constants"
apidefaults "github.com/gravitational/teleport/api/defaults"
tracessh "github.com/gravitational/teleport/api/observability/tracing/ssh"
"github.com/gravitational/teleport/api/profile"
Expand All @@ -58,6 +59,7 @@ import (
"github.com/gravitational/teleport/lib/bpf"
"github.com/gravitational/teleport/lib/client"
"github.com/gravitational/teleport/lib/events"
"github.com/gravitational/teleport/lib/events/filesessions"
"github.com/gravitational/teleport/lib/pam"
"github.com/gravitational/teleport/lib/reversetunnel"
"github.com/gravitational/teleport/lib/service"
Expand Down Expand Up @@ -205,6 +207,7 @@ func TestIntegrations(t *testing.T) {
t.Run("WindowChange", suite.bind(testWindowChange))
t.Run("SSHTracker", suite.bind(testSSHTracker))
t.Run("TestKubeAgentFiltering", suite.bind(testKubeAgentFiltering))
t.Run("SessionRecordingModes", suite.bind(testSessionRecordingModes))
}

// testAuditOn creates a live session, records a bunch of data through it
Expand Down Expand Up @@ -850,6 +853,165 @@ func testInteractiveReverseTunnel(t *testing.T, suite *integrationTestSuite) {
verifySessionJoin(t, suite.me.Username, teleport)
}

func testSessionRecordingModes(t *testing.T, suite *integrationTestSuite) {
tr := utils.NewTracer(utils.ThisFunction()).Start()
defer tr.Stop()

recConfig, err := types.NewSessionRecordingConfigFromConfigFile(types.SessionRecordingConfigSpecV2{
Mode: types.RecordAtNode,
})
require.NoError(t, err)

// Enable session recording on node.
cfg := suite.defaultServiceConfig()
cfg.Auth.Enabled = true
cfg.Auth.SessionRecordingConfig = recConfig
cfg.Proxy.DisableWebService = true
cfg.Proxy.DisableWebInterface = true
cfg.Proxy.Enabled = true
cfg.SSH.Enabled = true

teleport := suite.newTeleportWithConfig(t, nil, nil, cfg)
defer teleport.StopAll()

// startSession starts an interactive session, users must terminate the
// session by typing "exit" in the terminal.
startSession := func(username string) (*Terminal, chan error) {
term := NewTerminal(250)
errCh := make(chan error)

go func() {
cl, err := teleport.NewClient(ClientConfig{
Login: username,
Cluster: Site,
Host: Host,
})
if err != nil {
errCh <- trace.Wrap(err)
return
}
cl.Stdout = term
cl.Stdin = term

errCh <- cl.SSH(context.TODO(), []string{}, false)
}()

return term, errCh
}

waitSessionTermination := func(t *testing.T, errCh chan error, errorAssertion require.ErrorAssertionFunc) {
require.Eventually(t, func() bool {
select {
case err := <-errCh:
errorAssertion(t, err)
return true
default:
return false
}
}, 10*time.Second, 500*time.Millisecond)
}

// enableDiskFailure changes the OpenFileFunc on filesession package. The
// replace function will always return an error when called.
enableDiskFailure := func() {
filesessions.SetOpenFileFunc(func(path string, _ int, _ os.FileMode) (*os.File, error) {
return nil, fmt.Errorf("failed to open file %q", path)
})
}

// disableDiskFailure restore the OpenFileFunc.
disableDiskFailure := func() {
filesessions.SetOpenFileFunc(os.OpenFile)
}

for name, test := range map[string]struct {
recordingMode constants.SessionRecordingMode
expectSessionFailure bool
}{
"BestEffortMode": {
recordingMode: constants.SessionRecordingModeBestEffort,
expectSessionFailure: false,
},
"StrictMode": {
recordingMode: constants.SessionRecordingModeStrict,
expectSessionFailure: true,
},
} {
t.Run(name, func(t *testing.T) {
// Setup user and session recording mode.
username := suite.me.Username
role, err := types.NewRoleV3("devs", types.RoleSpecV5{
Allow: types.RoleConditions{
Logins: []string{username},
},
Options: types.RoleOptions{
RecordSession: &types.RecordSession{
SSH: test.recordingMode,
},
},
})
require.NoError(t, err)
require.NoError(t, SetupUser(teleport.Process, username, []types.Role{role}))

t.Run("BeforeStartFailure", func(t *testing.T) {
// Enable disk failure.
enableDiskFailure()
defer disableDiskFailure()

// Start session.
term, errCh := startSession(username)
if test.expectSessionFailure {
waitSessionTermination(t, errCh, require.Error)
return
}

// Send stuff to the session.
term.Type("echo Hello\n\r")

// Guarantee the session hasn't stopped after typing.
select {
case <-errCh:
require.Fail(t, "session was closed before")
default:
}

// Wait for the session to terminate without error.
term.Type("exit\n\r")
waitSessionTermination(t, errCh, require.NoError)
})

t.Run("MidSessionFailure", func(t *testing.T) {
// Start session.
term, errCh := startSession(username)

// Guarantee the session started properly.
select {
case <-errCh:
require.Fail(t, "session was closed before")
default:
}

// Enable disk failure
enableDiskFailure()
defer disableDiskFailure()

// Send stuff to the session.
term.Type("echo Hello\n\r")

// Expect the session to fail
if test.expectSessionFailure {
waitSessionTermination(t, errCh, require.Error)
return
}

// Wait for the session to terminate without error.
term.Type("exit\n\r")
waitSessionTermination(t, errCh, require.NoError)
})
})
}
}

// TestCustomReverseTunnel tests that the SSH node falls back to configured
// proxy address if it cannot connect via the proxy address from the reverse
// tunnel discovery query.
Expand Down
3 changes: 3 additions & 0 deletions lib/events/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,9 @@ type MultipartUploader interface {
CreateUpload(ctx context.Context, sessionID session.ID) (*StreamUpload, error)
// CompleteUpload completes the upload
CompleteUpload(ctx context.Context, upload StreamUpload, parts []StreamPart) error
// ReserveUploadPart reserves an upload part. Reserve is used to identify
// upload errors beforehand.
ReserveUploadPart(ctx context.Context, upload StreamUpload, partNumber int64) error
// UploadPart uploads part and returns the part
UploadPart(ctx context.Context, upload StreamUpload, partNumber int64, partBody io.ReadSeeker) (*StreamPart, error)
// ListParts returns all uploaded parts for the completed upload in sorted order
Expand Down
47 changes: 31 additions & 16 deletions lib/events/auditwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package events

import (
"context"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -414,6 +415,8 @@ func (a *AuditWriter) Complete(ctx context.Context) error {
}

func (a *AuditWriter) processEvents() {
defer a.cancel()

for {
// From the spec:
//
Expand All @@ -439,27 +442,29 @@ func (a *AuditWriter) processEvents() {
case event := <-a.eventsCh:
a.buffer = append(a.buffer, event)
err := a.stream.EmitAuditEvent(a.cfg.Context, event)
if err == nil {
continue
}
if IsPermanentEmitError(err) {
a.log.WithError(err).WithField("event", event).
Warning("Failed to emit audit event due to permanent emit audit event error. Event will be omitted.")
continue
}
a.log.WithError(err).Debug("Failed to emit audit event, attempting to recover stream.")
start := time.Now()
if err := a.recoverStream(); err != nil {
a.log.WithError(err).Warningf("Failed to recover stream.")
a.cancel()
return
if err != nil {
if IsPermanentEmitError(err) {
a.log.WithError(err).WithField("event", event).Warning("Failed to emit audit event due to permanent emit audit event error. Event will be omitted.")
continue
}

if isUnrecoverableError(err) {
a.log.WithError(err).Debug("Failed to emit audit event.")
return
}

a.log.WithError(err).Debug("Failed to emit audit event, attempting to recover stream.")
start := time.Now()
if err := a.recoverStream(); err != nil {
a.log.WithError(err).Warningf("Failed to recover stream.")
return
}
a.log.Debugf("Recovered stream in %v.", time.Since(start))
}
a.log.Debugf("Recovered stream in %v.", time.Since(start))
case <-a.stream.Done():
a.log.Debugf("Stream was closed by the server, attempting to recover.")
if err := a.recoverStream(); err != nil {
a.log.WithError(err).Warningf("Failed to recover stream.")
a.cancel()
return
}
case <-a.closeCtx.Done():
Expand Down Expand Up @@ -585,6 +590,11 @@ func (a *AuditWriter) tryResumeStream() (apievents.Stream, error) {
return nil, trace.ConnectionProblem(a.closeCtx.Err(), "operation has been canceled")
}
}

if isUnrecoverableError(err) {
return nil, trace.ConnectionProblem(err, "stream cannot be recovered")
}

select {
case <-retry.After():
a.log.WithError(err).Debug("Retrying to resume stream after backoff.")
Expand Down Expand Up @@ -657,3 +667,8 @@ func diff(before, after time.Time) int64 {
}
return d
}

// isUnrecoverableError returns if the provided stream error is unrecoverable.
func isUnrecoverableError(err error) bool {
return strings.Contains(err.Error(), uploaderReservePartErrorMessage)
}
30 changes: 30 additions & 0 deletions lib/events/auditwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package events
import (
"bytes"
"context"
"errors"
"io"
"math/rand"
"testing"
Expand Down Expand Up @@ -287,6 +288,35 @@ func TestAuditWriter(t *testing.T) {
require.Equal(t, event.GetClusterName(), "cluster")
}
})

t.Run("NonRecoverable", func(t *testing.T) {
test := newAuditWriterTest(t, func(streamer Streamer) (*CallbackStreamer, error) {
return NewCallbackStreamer(CallbackStreamerConfig{
Inner: streamer,
OnEmitAuditEvent: func(_ context.Context, _ session.ID, _ apievents.AuditEvent) error {
// Returns an unrecoverable error.
return errors.New(uploaderReservePartErrorMessage)
},
})
})

// First event will not fail since it is processed in the goroutine.
events := GenerateTestSession(SessionParams{SessionID: string(test.sid)})
require.NoError(t, test.writer.EmitAuditEvent(test.ctx, events[1]))

// Subsequent events will fail.
err := test.writer.EmitAuditEvent(test.ctx, events[1])
require.Error(t, err)

require.Eventually(t, func() bool {
select {
case <-test.writer.Done():
return true
default:
return false
}
}, 300*time.Millisecond, 100*time.Millisecond)
})
}

func TestBytesToSessionPrintEvents(t *testing.T) {
Expand Down
Loading