Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatic TearDown on Setup errors #18

Merged
merged 2 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dcrdtest/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ require (
github.com/decred/dcrd/rpcclient/v8 v8.0.0
github.com/decred/dcrd/txscript/v4 v4.1.0
github.com/decred/dcrd/wire v1.6.0
github.com/decred/slog v1.2.0
matheusd.com/testctx v0.1.0
)

require (
Expand All @@ -42,7 +44,6 @@ require (
github.com/decred/dcrd/math/uint256 v1.0.1 // indirect
github.com/decred/dcrd/peer/v3 v3.0.2 // indirect
github.com/decred/go-socks v1.1.0 // indirect
github.com/decred/slog v1.2.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions dcrdtest/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,5 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
lukechampine.com/blake3 v1.2.1 h1:YuqqRuaqsGV71BV/nm9xlI0MKUv4QC54jQnBChWbGnI=
lukechampine.com/blake3 v1.2.1/go.mod h1:0OFRp7fBtAylGVCO40o87sbupkyIGgbpv1+M1k1LM6k=
matheusd.com/testctx v0.1.0 h1:MBpaNuqr23ugnkA59gz8Bd6BQIGkvZr7M4vYAc/Apzc=
matheusd.com/testctx v0.1.0/go.mod h1:u9la0YA1XIBcEpTU/aHJ9q4/L0VttkwhkG2m4lrj7Ls=
29 changes: 27 additions & 2 deletions dcrdtest/memwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ type memWallet struct {

net *chaincfg.Params

// quit is closed when the harness node is stopped.
quit chan struct{}

// wg tracks the mem wallet's goroutines.
wg sync.WaitGroup

rpc *rpcclient.Client

sync.RWMutex
Expand Down Expand Up @@ -164,14 +170,23 @@ func newMemWallet(net *chaincfg.Params, harnessID uint32) (*memWallet, error) {
utxos: make(map[wire.OutPoint]*utxo),
chainUpdateSignal: make(chan struct{}),
reorgJournal: make(map[int64]*undoEntry),
quit: make(chan struct{}),
wg: sync.WaitGroup{},
}, nil
}

// Start launches all goroutines required for the wallet to function properly.
func (m *memWallet) Start() {
m.wg.Add(1)
go m.chainSyncer()
}

// Stop stops all goroutines required for the wallet to function properly.
func (m *memWallet) Stop() {
close(m.quit)
m.wg.Wait()
}

// SyncedHeight returns the height the wallet is known to be synced to.
//
// This function is safe for concurrent access.
Expand Down Expand Up @@ -219,7 +234,10 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) {
// available. We do this in a new goroutine in order to avoid blocking
// the main loop of the rpc client.
go func() {
m.chainUpdateSignal <- struct{}{}
select {
case m.chainUpdateSignal <- struct{}{}:
case <-m.quit:
}
}()
}

Expand All @@ -230,10 +248,17 @@ func (m *memWallet) IngestBlock(header []byte, filteredTxns [][]byte) {
func (m *memWallet) chainSyncer() {
log.Tracef("memwallet.chainSyncer")
defer log.Tracef("memwallet.chainSyncer exit")
defer m.wg.Done()

var update *chainUpdate

for range m.chainUpdateSignal {
for {
select {
case <-m.chainUpdateSignal:
case <-m.quit:
return
}

// A new update is available, so pop the new chain update from
// the front of the update queue.
m.chainMtx.Lock()
Expand Down
45 changes: 36 additions & 9 deletions dcrdtest/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
rpc "github.com/decred/dcrd/rpcclient/v8"
)

// errDcrdCmdExec is the error returned when the dcrd binary is not executed.
var errDcrdCmdExec = errors.New("unable to exec dcrd binary")

// nodeConfig contains all the args, and data required to launch a dcrd process
// and connect the rpc client to it.
type nodeConfig struct {
Expand Down Expand Up @@ -201,7 +204,6 @@ func newNode(config *nodeConfig, dataDir string) (*node, error) {
return &node{
config: config,
dataDir: dataDir,
cmd: config.command(),
}, nil
}

Expand All @@ -216,8 +218,10 @@ func (n *node) start(ctx context.Context) error {
var pid sync.WaitGroup
pid.Add(1)

cmd := n.config.command()

// Redirect stderr.
n.stderr, err = n.cmd.StderrPipe()
n.stderr, err = cmd.StderrPipe()
if err != nil {
return err
}
Expand All @@ -229,15 +233,18 @@ func (n *node) start(ctx context.Context) error {
for {
line, err := r.ReadBytes('\n')
if errors.Is(err, io.EOF) {
log.Tracef("stderr: EOF")
n.logf("stderr: EOF")
return
} else if err != nil {
n.logf("stderr: Unable to read stderr: %v", err)
return
}
n.logf("stderr: %s", line)
}
}()

// Redirect stdout.
n.stdout, err = n.cmd.StdoutPipe()
n.stdout, err = cmd.StdoutPipe()
if err != nil {
return err
}
Expand All @@ -249,7 +256,10 @@ func (n *node) start(ctx context.Context) error {
for {
line, err := r.ReadBytes('\n')
if errors.Is(err, io.EOF) {
log.Tracef("stdout: EOF")
n.logf("stdout: EOF")
return
} else if err != nil {
n.logf("stdout: Unable to read stdout: %v", err)
return
}
log.Tracef("stdout: %s", line)
Expand All @@ -269,8 +279,10 @@ func (n *node) start(ctx context.Context) error {
switch msg := msg.(type) {
case boundP2PListenAddrEvent:
p2pAddr = string(msg)
n.logf("P2P listen addr: %s", p2pAddr)
case boundRPCListenAddrEvent:
rpcAddr = string(msg)
n.logf("RPC listen addr: %s", rpcAddr)
}
if p2pAddr != "" && rpcAddr != "" {
close(gotSubsysAddrs)
Expand All @@ -283,33 +295,45 @@ func (n *node) start(ctx context.Context) error {
for err == nil {
_, err = nextIPCMessage(n.config.pipeRX.r)
}
n.logf("IPC messages drained")
}()

// Launch command and store pid.
if err := n.cmd.Start(); err != nil {
return err
if err := cmd.Start(); err != nil {
// When failing to execute, wait until running goroutines are
// closed.
pid.Done()
n.wg.Wait()
n.config.pipeTX.close()
n.config.pipeRX.close()
return fmt.Errorf("%w: %v", errDcrdCmdExec, err)
}
n.cmd = cmd
n.pid = n.cmd.Process.Pid

// Unblock pipes now pid is available
// Unblock pipes now that pid is available.
pid.Done()

f, err := os.Create(filepath.Join(n.config.String(), "dcrd.pid"))
if err != nil {
_ = n.stop() // Cleanup what has been done so far.
return err
}

n.pidFile = f.Name()
if _, err = fmt.Fprintf(f, "%d\n", n.cmd.Process.Pid); err != nil {
_ = n.stop() // Cleanup what has been done so far.
return err
}
if err := f.Close(); err != nil {
_ = n.stop() // Cleanup what has been done so far.
return err
}

// Read the RPC and P2P addresses.
select {
case <-ctx.Done():
_ = n.stop() // Cleanup what has been done so far.
return ctx.Err()
case <-gotSubsysAddrs:
n.p2pAddr = p2pAddr
Expand All @@ -323,7 +347,7 @@ func (n *node) start(ctx context.Context) error {
// properly. On windows, interrupt is not supported, so a kill signal is used
// instead
func (n *node) stop() error {
log.Tracef("stop %p %p", n.cmd, n.cmd.Process)
log.Tracef("stop %p", n.cmd)
defer log.Tracef("stop done")

if n.cmd == nil || n.cmd.Process == nil {
Expand Down Expand Up @@ -366,6 +390,9 @@ func (n *node) stop() error {
if err := n.config.pipeTX.close(); err != nil {
n.logf("Unable to close pipe TX: %v", err)
}

// Mark command terminated.
n.cmd = nil
return nil
}

Expand Down
64 changes: 57 additions & 7 deletions dcrdtest/rpc_harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package dcrdtest

import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -38,6 +39,8 @@ var (
// throughout the life of this package.
pathToDCRD string
pathToDCRDMtx sync.RWMutex

errNilCoinbaseAddr = errors.New("memWallet coinbase addr is nil")
)

// Harness fully encapsulates an active dcrd process to provide a unified
Expand All @@ -64,6 +67,8 @@ type Harness struct {
maxConnRetries int
nodeNum int

keepNodeDir bool

sync.Mutex
}

Expand Down Expand Up @@ -211,14 +216,30 @@ func New(t *testing.T, activeNet *chaincfg.Params, handlers *rpcclient.Notificat
return h, nil
}

// SetKeepNodeDir sets the flag in the Harness on whether to keep or remove
// its node dir after TearDown is called.
//
// This is NOT safe for concurrent access and MUST be called from the same
// goroutine that calls SetUp and TearDown.
func (h *Harness) SetKeepNodeDir(keep bool) {
h.keepNodeDir = keep
}

// SetUp initializes the rpc test state. Initialization includes: starting up a
// simnet node, creating a websockets client and connecting to the started
// node, and finally: optionally generating and submitting a testchain with a
// configurable number of mature coinbase outputs coinbase outputs.
//
// NOTE: This method and TearDown should always be called from the same
// goroutine as they are not concurrent safe.
func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) error {
func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutputs uint32) (err error) {
defer func() {
if err != nil {
tearErr := h.TearDown()
log.Warnf("Teardown error after setup error %v: %v", err, tearErr)
}
}()

// Start the dcrd node itself. This spawns a new process which will be
// managed
if err := h.node.start(ctx); err != nil {
Expand All @@ -231,6 +252,9 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp

// Filter transactions that pay to the coinbase associated with the
// wallet.
if h.wallet.coinbaseAddr == nil {
return errNilCoinbaseAddr
}
filterAddrs := []stdaddr.Address{h.wallet.coinbaseAddr}
if err := h.Node.LoadTxFilter(ctx, true, filterAddrs, nil); err != nil {
return err
Expand Down Expand Up @@ -282,17 +306,36 @@ func (h *Harness) SetUp(ctx context.Context, createTestChain bool, numMatureOutp
// NOTE: This method and SetUp should always be called from the same goroutine
// as they are not concurrent safe.
func (h *Harness) TearDown() error {
log.Tracef("TearDown %p %p", h.Node, h.node)
defer log.Tracef("TearDown done")
log.Debugf("TearDown %p %p", h.Node, h.node)
defer log.Debugf("TearDown done")

if h.Node != nil {
log.Tracef("TearDown: Node")
log.Debugf("TearDown: Node")
h.Node.Shutdown()
h.Node = nil
}

log.Tracef("TearDown: node")
if err := h.node.shutdown(); err != nil {
return err
if h.node != nil {
log.Debugf("TearDown: node")
node := h.node
h.node = nil
if err := node.shutdown(); err != nil {
return err
}
}

log.Debugf("TearDown: wallet")
if h.wallet != nil {
h.wallet.Stop()
h.wallet = nil
}

if !h.keepNodeDir {
if err := os.RemoveAll(h.testNodeDir); err != nil {
log.Warnf("Unable to remove test node dir %s: %v", h.testNodeDir, err)
} else {
log.Debugf("Removed test node dir %s", h.testNodeDir)
}
}

return nil
Expand All @@ -301,7 +344,14 @@ func (h *Harness) TearDown() error {
// TearDownInTest performs the TearDown during a test, logging the error to the
// test object. If the test has not yet failed and the TearDown itself fails,
// then this fails the test.
//
// If the test has already failed, then the dir for node data is kept for manual
// debugging.
func (h *Harness) TearDownInTest(t testing.TB) {
if t.Failed() {
h.SetKeepNodeDir(true)
}

err := h.TearDown()
if err != nil {
errMsg := fmt.Sprintf("Unable to teardown dcrdtest harness: %v", err)
Expand Down
Loading