Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flaky shim test #7656

Merged
merged 4 commits into from
Jun 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions plugins/inputs/execd/shim/goshim.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (s *Shim) Run(pollInterval time.Duration) error {
}
gatherPromptCh := make(chan empty, 1)
s.gatherPromptChans = append(s.gatherPromptChans, gatherPromptCh)
wg.Add(1)
wg.Add(1) // one per input
go func(input telegraf.Input) {
startGathering(ctx, input, acc, gatherPromptCh, pollInterval)
if serviceInput, ok := input.(telegraf.ServiceInput); ok {
Expand Down Expand Up @@ -216,11 +216,7 @@ func startGathering(ctx context.Context, input telegraf.Input, acc telegraf.Accu
select {
case <-ctx.Done():
return
case _, open := <-gatherPromptCh:
if !open {
// stdin has closed.
return
}
case <-gatherPromptCh:
if err := input.Gather(acc); err != nil {
fmt.Fprintf(os.Stderr, "failed to gather metrics: %s", err)
}
Expand Down
10 changes: 3 additions & 7 deletions plugins/inputs/execd/shim/shim_posix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
}
}()

timeout := time.NewTimer(10 * time.Second)

select {
case <-metricProcessed:
case <-timeout.C:
require.Fail(t, "Timeout waiting for metric to arrive")
}
<-metricProcessed
cancel()

r := bufio.NewReader(stdoutReader)
Expand All @@ -66,5 +60,7 @@ func TestShimUSR1SignalingWorks(t *testing.T) {
require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out)

stdinWriter.Close()
readUntilEmpty(r)

<-exited
}
40 changes: 20 additions & 20 deletions plugins/inputs/execd/shim/shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,12 @@ func TestShimWorks(t *testing.T) {

stdin, _ = io.Pipe() // hold the stdin pipe open

timeout := time.NewTimer(10 * time.Second)
metricProcessed, _ := runInputPlugin(t, 10*time.Millisecond)

select {
case <-metricProcessed:
case <-timeout.C:
require.Fail(t, "Timeout waiting for metric to arrive")
}
<-metricProcessed
for stdoutBytes.Len() == 0 {
select {
case <-timeout.C:
require.Fail(t, "Timeout waiting to read metric from stdout")
return
default:
time.Sleep(10 * time.Millisecond)
}
t.Log("Waiting for bytes available in stdout")
time.Sleep(10 * time.Millisecond)
}

out := string(stdoutBytes.Bytes())
Expand All @@ -52,29 +42,27 @@ func TestShimStdinSignalingWorks(t *testing.T) {
stdin = stdinReader
stdout = stdoutWriter

timeout := time.NewTimer(10 * time.Second)
metricProcessed, exited := runInputPlugin(t, 40*time.Second)

stdinWriter.Write([]byte("\n"))

select {
case <-metricProcessed:
case <-timeout.C:
require.Fail(t, "Timeout waiting for metric to arrive")
}
<-metricProcessed

r := bufio.NewReader(stdoutReader)
out, err := r.ReadString('\n')
require.NoError(t, err)
require.Equal(t, "measurement,tag=tag field=1i 1234000005678\n", out)

stdinWriter.Close()

readUntilEmpty(r)

// check that it exits cleanly
<-exited
}

func runInputPlugin(t *testing.T, interval time.Duration) (metricProcessed chan bool, exited chan bool) {
metricProcessed = make(chan bool)
metricProcessed = make(chan bool, 10)
exited = make(chan bool)
inp := &testInput{
metricProcessed: metricProcessed,
Expand Down Expand Up @@ -172,3 +160,15 @@ func (i *serviceInput) Start(acc telegraf.Accumulator) error {

func (i *serviceInput) Stop() {
}

// we can get stuck if stdout gets clogged up and nobody's reading from it.
// make sure we keep it going
func readUntilEmpty(r *bufio.Reader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the shim code we are testing be doing this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, because this is the read end of the out pipe coming from the shim.

go func() {
var err error
for err != io.EOF {
_, err = r.ReadString('\n')
time.Sleep(10 * time.Millisecond)
}
}()
}