Skip to content

Commit

Permalink
roachprod: mock session
Browse files Browse the repository at this point in the history
Previously, it was difficult to test code that creates a remote session.

This change adds a mock implementation of session, and the hooks in
SyncedCluster to use it during tests.

The mock session provides several hooks for controlling the IO of a session.
The inputs and outputs are backed by a channel to allow finer control of the
flow of data.

Currently, it only implements what was required to test the Monitor
functionality, but can be extended in the future to cover more use cases.

Informs: cockroachdb#118214
Epic: None
  • Loading branch information
herkolategan committed Dec 9, 2024
1 parent a77d6ae commit 05da5c0
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 0 deletions.
1 change: 1 addition & 0 deletions pkg/roachprod/install/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ go_test(
"cluster_synced_test.go",
"cockroach_test.go",
"services_test.go",
"session_test.go",
"staging_test.go",
"start_template_test.go",
],
Expand Down
7 changes: 7 additions & 0 deletions pkg/roachprod/install/cluster_synced.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ type SyncedCluster struct {

// AuthorizedKeys is used by SetupSSH to add additional authorized keys.
AuthorizedKeys []byte

// sessionProvider is a function that returns a new session. It serves as a
// testing hook, and if null the default session implementations will be used.
sessionProvider func(node Node, cmd string) session
}

// NewSyncedCluster creates a SyncedCluster, given the cluster metadata, node
Expand Down Expand Up @@ -385,6 +389,9 @@ func (c *SyncedCluster) validateHost(ctx context.Context, l *logger.Logger, node
func (c *SyncedCluster) newSession(
l *logger.Logger, node Node, cmd string, options ...remoteSessionOption,
) session {
if c.sessionProvider != nil {
return c.sessionProvider(node, cmd)
}
if c.IsLocal() {
return newLocalSession(cmd)
}
Expand Down
142 changes: 142 additions & 0 deletions pkg/roachprod/install/session_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package install

import (
"context"
"io"
"sync/atomic"
)

type MockSession struct {
OnCombinedOutput func(context.Context) ([]byte, error)
OnRun func(context.Context) error
OnStart func(context.Context) error
OnWait func() error

Stdout io.ReadWriteCloser
Stderr io.ReadWriteCloser
}

type MockSessionOptions struct {
ChannelSize int
}

// dataChannel emulates blocking reads and writes with a channel.
type dataChannel struct {
dataChan chan []byte
closed atomic.Bool
}

// MockSession implements the session interface.
var _ session = &MockSession{}

func (m MockSession) CombinedOutput(ctx context.Context) ([]byte, error) {
if m.OnCombinedOutput != nil {
return m.OnCombinedOutput(ctx)
}
return nil, nil
}

func (m MockSession) Run(ctx context.Context) error {
if m.OnRun != nil {
return m.OnRun(ctx)
}
return nil
}

func (m MockSession) Start() error {
if m.OnStart != nil {
return m.OnStart(context.Background())
}
return nil
}

func (m MockSession) RequestPty() error {
return nil
}

func (m MockSession) Wait() error {
if m.OnWait != nil {
return m.OnWait()
}
return nil
}

func (m MockSession) Close() {
_ = m.Stdout.Close()
_ = m.Stderr.Close()
}

func (m MockSession) SetStdin(_ io.Reader) {
panic("not implemented")
}

func (m MockSession) SetStdout(_ io.Writer) {
panic("not implemented")
}

func (m MockSession) SetStderr(_ io.Writer) {
panic("not implemented")
}

func (m MockSession) StdinPipe() (io.WriteCloser, error) {
panic("not implemented")
}

func (m MockSession) StdoutPipe() (io.Reader, error) {
return m.Stdout, nil
}

func (m MockSession) StderrPipe() (io.Reader, error) {
return m.Stderr, nil
}

// NewMockSession creates a new mock session. The mock session can be used to
// simulate the Stdout and Stderr of a session. Stdout and Stderr are backed by
// channels, that are unbuffered by default.
func NewMockSession(opts MockSessionOptions) *MockSession {
return &MockSession{
Stdout: newDataChannel(opts.ChannelSize),
Stderr: newDataChannel(opts.ChannelSize),
}
}

// DefaultMockSessionOptions returns the default options for a mock session.
func DefaultMockSessionOptions() MockSessionOptions {
return MockSessionOptions{}
}

func newDataChannel(channelSize int) io.ReadWriteCloser {
return &dataChannel{dataChan: make(chan []byte, channelSize)}
}

// Close is a no-op if the channel is already closed. This allows tests or the
// implementation to close the channel without panicking.
func (b *dataChannel) Close() error {
if b.closed.Swap(true) {
return nil
}
close(b.dataChan)
return nil
}

func (b *dataChannel) Write(p []byte) (n int, err error) {
b.dataChan <- p
return len(p), nil
}

func (b *dataChannel) Read(p []byte) (n int, err error) {
data := <-b.dataChan
if data == nil {
return 0, io.EOF
}
copiedLen := copy(p, data)
if copiedLen < len(data) {
b.dataChan <- data[copiedLen:]
}
return copiedLen, nil
}

0 comments on commit 05da5c0

Please sign in to comment.