Skip to content

Commit

Permalink
Merge branch 'main' into fix/log_test
Browse files Browse the repository at this point in the history
# Conflicts:
#	raft/log_test.go
  • Loading branch information
falser101 committed Nov 15, 2022
2 parents dd54a34 + 970ecfc commit 8ac5bb0
Show file tree
Hide file tree
Showing 54 changed files with 987 additions and 493 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
echo "${TARGET}"
case "${TARGET}" in
linux-amd64-e2e)
PASSES='build release e2e' MANUAL_VER=v3.5.0 CPU='4' EXPECT_DEBUG='true' COVER='false' RACE='true' ./scripts/test.sh 2>&1 | tee test.log
PASSES='build release e2e' CPU='4' EXPECT_DEBUG='true' COVER='false' RACE='true' ./scripts/test.sh 2>&1 | tee test.log
! grep -E "(--- FAIL:|FAIL:|DATA RACE|panic: test timed out|appears to have leaked)" -B50 -A10 test.log
;;
linux-386-e2e)
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG/CHANGELOG-3.5.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Previous change logs can be found at [CHANGELOG-3.4](https://github.com/etcd-io/
- Fix [auth invalid token and old revision errors in watch](https://github.com/etcd-io/etcd/pull/14547)
- Fix [avoid closing a watch with ID 0 incorrectly](https://github.com/etcd-io/etcd/pull/14563)
- Fix [auth: fix data consistency issue caused by recovery from snapshot](https://github.com/etcd-io/etcd/pull/14648)
- Fix [revision might be inconsistency between members when etcd crashes during processing defragmentation operation](https://github.com/etcd-io/etcd/pull/14733)

### Package `netutil`
- Fix [netutil: add url comparison without resolver to URLStringsEqual](https://github.com/etcd-io/etcd/pull/14573)
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG/CHANGELOG-3.6.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ See [code changes](https://github.com/etcd-io/etcd/compare/v3.5.0...v3.6.0).
- Package `wal` was moved to `storage/wal`
- Package `datadir` was moved to `storage/datadir`

### Package `raft`
- Send empty `MsgApp` when entry in-flight limits are exceeded. See [pull/14633](https://github.com/etcd-io/etcd/pull/14633).
- Add [MaxInflightBytes](https://github.com/etcd-io/etcd/pull/14624) setting in `raft.Config` for better flow control of entries.

### etcd server

- Add [`etcd --log-format`](https://github.com/etcd-io/etcd/pull/13339) flag to support log format.
Expand Down
4 changes: 2 additions & 2 deletions contrib/lock/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func handler(w http.ResponseWriter, r *http.Request) {
}

func escape(s string) string {
escaped := strings.Replace(s, "\n", " ", -1)
escaped = strings.Replace(escaped, "\r", " ", -1)
escaped := strings.ReplaceAll(s, "\n", " ")
escaped = strings.ReplaceAll(escaped, "\r", " ")
return escaped
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/cobrautl/help.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ GLOBAL OPTIONS:
{{end}}
`[1:]

commandUsageTemplate = template.Must(template.New("command_usage").Funcs(templFuncs).Parse(strings.Replace(commandUsage, "\\\n", "", -1)))
commandUsageTemplate = template.Must(template.New("command_usage").Funcs(templFuncs).Parse(strings.ReplaceAll(commandUsage, "\\\n", "")))
}

func etcdFlagUsages(flagSet *pflag.FlagSet) string {
Expand Down
213 changes: 144 additions & 69 deletions pkg/expect/expect.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package expect
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
Expand All @@ -33,18 +34,23 @@ import (

const DEBUG_LINES_TAIL = 40

var (
ErrProcessRunning = fmt.Errorf("process is still running")
)

type ExpectProcess struct {
cfg expectConfig

cmd *exec.Cmd
fpty *os.File
wg sync.WaitGroup

mu sync.Mutex // protects lines and err
lines []string
count int // increment whenever new line gets added
cur int // current read position
err error
mu sync.Mutex // protects lines, count, cur, exitErr and exitCode
lines []string
count int // increment whenever new line gets added
cur int // current read position
exitErr error // process exit error
exitCode int
}

// NewExpect creates a new process for expect testing.
Expand All @@ -69,8 +75,9 @@ func NewExpectWithEnv(name string, args []string, env []string, serverProcessCon
return nil, err
}

ep.wg.Add(1)
ep.wg.Add(2)
go ep.read()
go ep.waitSaveExitErr()
return ep, nil
}

Expand All @@ -95,46 +102,83 @@ func (ep *ExpectProcess) Pid() int {

func (ep *ExpectProcess) read() {
defer ep.wg.Done()
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
defer func(fpty *os.File) {
err := fpty.Close()
if err != nil {
// we deliberately only log the error here, closing the PTY should mostly be (expected) broken pipes
fmt.Printf("error while closing fpty: %v", err)
}
}(ep.fpty)

r := bufio.NewReader(ep.fpty)
for {
l, err := r.ReadString('\n')
ep.mu.Lock()
if l != "" {
if printDebugLines {
fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, ep.cmd.Process.Pid, l)
}
ep.lines = append(ep.lines, l)
ep.count++
}
err := ep.tryReadNextLine(r)
if err != nil {
ep.err = err
ep.mu.Unlock()
break
}
ep.mu.Unlock()
}
}

func (ep *ExpectProcess) tryReadNextLine(r *bufio.Reader) error {
printDebugLines := os.Getenv("EXPECT_DEBUG") != ""
l, err := r.ReadString('\n')

ep.mu.Lock()
defer ep.mu.Unlock()

if l != "" {
if printDebugLines {
fmt.Printf("%s (%s) (%d): %s", ep.cmd.Path, ep.cfg.name, ep.cmd.Process.Pid, l)
}
ep.lines = append(ep.lines, l)
ep.count++
}

// we're checking the error here at the bottom to ensure any leftover reads are still taken into account
return err
}

func (ep *ExpectProcess) waitSaveExitErr() {
defer ep.wg.Done()
err := ep.waitProcess()

ep.mu.Lock()
defer ep.mu.Unlock()
if err != nil {
ep.exitErr = err
}
}

// ExpectFunc returns the first line satisfying the function f.
func (ep *ExpectProcess) ExpectFunc(ctx context.Context, f func(string) bool) (string, error) {
i := 0

for {
ep.mu.Lock()
for i < len(ep.lines) {
line := ep.lines[i]
i++
if f(line) {
ep.mu.Unlock()
return line, nil
line, errsFound := func() (string, bool) {
ep.mu.Lock()
defer ep.mu.Unlock()

// check if this expect has been already closed
if ep.cmd == nil {
return "", true
}

for i < len(ep.lines) {
line := ep.lines[i]
i++
if f(line) {
return line, false
}
}
return "", ep.exitErr != nil
}()

if line != "" {
return line, nil
}
if ep.err != nil {
ep.mu.Unlock()

if errsFound {
break
}
ep.mu.Unlock()

select {
case <-ctx.Done():
Expand All @@ -143,16 +187,18 @@ func (ep *ExpectProcess) ExpectFunc(ctx context.Context, f func(string) bool) (s
// continue loop
}
}

ep.mu.Lock()
defer ep.mu.Unlock()

lastLinesIndex := len(ep.lines) - DEBUG_LINES_TAIL
if lastLinesIndex < 0 {
lastLinesIndex = 0
}
lastLines := strings.Join(ep.lines[lastLinesIndex:], "")
ep.mu.Unlock()
return "", fmt.Errorf("match not found."+
" Set EXPECT_DEBUG for more info Err: %v, last lines:\n%s",
ep.err, lastLines)
return "", fmt.Errorf("match not found. "+
" Set EXPECT_DEBUG for more info Errs: [%v], last lines:\n%s",
ep.exitErr, lastLines)
}

// ExpectWithContext returns the first line containing the given string.
Expand All @@ -174,63 +220,92 @@ func (ep *ExpectProcess) LineCount() int {
return ep.count
}

// Stop kills the expect process and waits for it to exit.
func (ep *ExpectProcess) Stop() error { return ep.close(true) }
// ExitCode returns the exit code of this process.
// If the process is still running, it returns exit code 0 and ErrProcessRunning.
func (ep *ExpectProcess) ExitCode() (int, error) {
ep.mu.Lock()
defer ep.mu.Unlock()

// Signal sends a signal to the expect process
func (ep *ExpectProcess) Signal(sig os.Signal) error {
return ep.cmd.Process.Signal(sig)
if ep.cmd == nil {
return ep.exitCode, nil
}

return 0, ErrProcessRunning
}

func (ep *ExpectProcess) Wait() error {
_, err := ep.cmd.Process.Wait()
// ExitError returns the exit error of this process (if any).
// If the process is still running, it returns ErrProcessRunning instead.
func (ep *ExpectProcess) ExitError() error {
ep.mu.Lock()
defer ep.mu.Unlock()

if ep.cmd == nil {
return ep.exitErr
}

return ErrProcessRunning
}

// Stop signals the process to terminate via SIGTERM
func (ep *ExpectProcess) Stop() error {
err := ep.Signal(syscall.SIGTERM)
if err != nil && strings.Contains(err.Error(), "os: process already finished") {
return nil
}
return err
}

// Close waits for the expect process to exit.
// Close currently does not return error if process exited with !=0 status.
// TODO: Close should expose underlying process failure by default.
func (ep *ExpectProcess) Close() error { return ep.close(false) }
// Signal sends a signal to the expect process
func (ep *ExpectProcess) Signal(sig os.Signal) error {
ep.mu.Lock()
defer ep.mu.Unlock()

func (ep *ExpectProcess) close(kill bool) error {
if ep.cmd == nil {
return ep.err
}
if kill {
ep.Signal(syscall.SIGTERM)
return errors.New("expect process already closed")
}

err := ep.cmd.Wait()
ep.fpty.Close()
ep.wg.Wait()
return ep.cmd.Process.Signal(sig)
}

func (ep *ExpectProcess) waitProcess() error {
state, err := ep.cmd.Process.Wait()
if err != nil {
if !kill && strings.Contains(err.Error(), "exit status") {
// non-zero exit code
err = nil
} else if kill && strings.Contains(err.Error(), "signal:") {
err = nil
}
return err
}

ep.mu.Lock()
defer ep.mu.Unlock()
ep.exitCode = state.ExitCode()

if !state.Success() {
return fmt.Errorf("unexpected exit code [%d] after running [%s]", ep.exitCode, ep.cmd.String())
}

return nil
}

// Wait waits for the process to finish.
func (ep *ExpectProcess) Wait() {
ep.wg.Wait()
}

// Close waits for the expect process to exit and return its error.
func (ep *ExpectProcess) Close() error {
ep.wg.Wait()

ep.mu.Lock()
defer ep.mu.Unlock()

// this signals to other funcs that the process has finished
ep.cmd = nil
return err
return ep.exitErr
}

func (ep *ExpectProcess) Send(command string) error {
_, err := io.WriteString(ep.fpty, command)
return err
}

func (ep *ExpectProcess) ProcessError() error {
if strings.Contains(ep.err.Error(), "input/output error") {
// TODO: The expect library should not return
// `/dev/ptmx: input/output error` when process just exits.
return nil
}
return ep.err
}

func (ep *ExpectProcess) Lines() []string {
ep.mu.Lock()
defer ep.mu.Unlock()
Expand Down
Loading

0 comments on commit 8ac5bb0

Please sign in to comment.