-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding unit tests for publisher output #17460
Changes from 35 commits
d7ffd02
7e49fce
5627ff9
babb398
b7163dd
0034f4b
c0ee69e
a950b19
8018169
a92ec73
29f7d23
9fa686d
3b71cf1
eb04fce
4f14aec
c0de546
c2a0dec
2352fc3
ee896eb
06985d6
66942f1
d936aee
1813f12
de3d3f6
e8cde93
6f89e19
915e6ce
8bb19ee
57b527b
310dc4a
94f3445
be4bf09
508b606
457434f
3b3de96
6a8793f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,261 @@ | ||
// Licensed to Elasticsearch B.V. under one or more contributor | ||
// license agreements. See the NOTICE file distributed with | ||
// this work for additional information regarding copyright | ||
// ownership. Elasticsearch B.V. licenses this file to you under | ||
// the Apache License, Version 2.0 (the "License"); you may | ||
// not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
package pipeline | ||
|
||
import ( | ||
"flag" | ||
"math" | ||
"math/rand" | ||
"sync" | ||
"testing" | ||
"testing/quick" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/elastic/beats/v7/libbeat/common/atomic" | ||
"github.com/elastic/beats/v7/libbeat/logp" | ||
"github.com/elastic/beats/v7/libbeat/outputs" | ||
"github.com/elastic/beats/v7/libbeat/publisher" | ||
"github.com/elastic/beats/v7/libbeat/publisher/queue" | ||
) | ||
|
||
var ( | ||
SeedFlag = flag.Int64("seed", 0, "Randomization seed") | ||
) | ||
|
||
func TestMakeClientWorker(t *testing.T) { | ||
tests := map[string]func(mockPublishFn) outputs.Client{ | ||
"client": newMockClient, | ||
"network_client": newMockNetworkClient, | ||
} | ||
|
||
for name, ctor := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
seedPRNG(t) | ||
|
||
err := quick.Check(func(i uint) bool { | ||
numBatches := 300 + (i % 100) // between 300 and 399 | ||
|
||
var published atomic.Uint | ||
publishFn := func(batch publisher.Batch) error { | ||
published.Add(uint(len(batch.Events()))) | ||
return nil | ||
} | ||
|
||
wqu := makeWorkQueue() | ||
client := ctor(publishFn) | ||
makeClientWorker(nilObserver, wqu, client) | ||
|
||
numEvents := atomic.MakeUint(0) | ||
for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { | ||
batch := randomBatch(50, 150, wqu) | ||
numEvents.Add(uint(len(batch.Events()))) | ||
wqu <- batch | ||
} | ||
|
||
// Give some time for events to be published | ||
timeout := 20 * time.Second | ||
|
||
// Make sure that all events have eventually been published | ||
return waitUntilTrue(timeout, func() bool { | ||
return numEvents == published | ||
}) | ||
}, nil) | ||
|
||
if err != nil { | ||
t.Error(err) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func TestMakeClientWorkerAndClose(t *testing.T) { | ||
tests := map[string]func(mockPublishFn) outputs.Client{ | ||
"client": newMockClient, | ||
"network_client": newMockNetworkClient, | ||
} | ||
|
||
const minEventsInBatch = 50 | ||
|
||
for name, ctor := range tests { | ||
t.Run(name, func(t *testing.T) { | ||
seedPRNG(t) | ||
|
||
err := quick.Check(func(i uint) bool { | ||
numBatches := 1000 + (i % 100) // between 1000 and 1099 | ||
|
||
wqu := makeWorkQueue() | ||
numEvents := atomic.MakeUint(0) | ||
|
||
var wg sync.WaitGroup | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
for batchIdx := uint(0); batchIdx <= numBatches; batchIdx++ { | ||
batch := randomBatch(minEventsInBatch, 150, wqu) | ||
numEvents.Add(uint(len(batch.Events()))) | ||
wqu <- batch | ||
} | ||
}() | ||
|
||
// Publish at least 1 batch worth of events but no more than 20% events | ||
publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents.Load())*0.2)) | ||
|
||
var publishedFirst atomic.Uint | ||
blockCtrl := make(chan struct{}) | ||
blockingPublishFn := func(batch publisher.Batch) error { | ||
// Emulate blocking. Upon unblocking the in-flight batch that was | ||
// blocked is published. | ||
if publishedFirst.Load() >= publishLimit { | ||
<-blockCtrl | ||
} | ||
|
||
publishedFirst.Add(uint(len(batch.Events()))) | ||
return nil | ||
} | ||
|
||
client := ctor(blockingPublishFn) | ||
worker := makeClientWorker(nilObserver, wqu, client) | ||
|
||
// Allow the worker to make *some* progress before we close it | ||
timeout := 10 * time.Second | ||
progress := waitUntilTrue(timeout, func() bool { | ||
return publishedFirst.Load() >= publishLimit | ||
}) | ||
if !progress { | ||
return false | ||
} | ||
|
||
// Close worker before all batches have had time to be published | ||
err := worker.Close() | ||
require.NoError(t, err) | ||
close(blockCtrl) | ||
|
||
// Start new worker to drain work queue | ||
var publishedLater atomic.Uint | ||
countingPublishFn := func(batch publisher.Batch) error { | ||
publishedLater.Add(uint(len(batch.Events()))) | ||
return nil | ||
} | ||
|
||
client = ctor(countingPublishFn) | ||
makeClientWorker(nilObserver, wqu, client) | ||
wg.Wait() | ||
|
||
// Make sure that all events have eventually been published | ||
timeout = 20 * time.Second | ||
return waitUntilTrue(timeout, func() bool { | ||
return numEvents.Load() == publishedFirst.Load()+publishedLater.Load() | ||
}) | ||
}, &quick.Config{MaxCount: 50}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keeping the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before introducing quick check the count was actually 1 :) This is some kind of stress test. Unfortunately stress tests don't sit well with travis. We have had bad performance issues with the queue stress tests as well. I think long termI think we should not have stress tests run by travis, but have a separate job running those for even longer. For 'some' simple unit testing a count of 1 might be ok. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True, good point :) So would you recommend leaving this at 50, lowering it to 1 or maybe somewhere in between? I ask because while 50 is working at the moment I'm worried whether it'll become a source of flakiness. I don't think there's a way to know for sure until Travis runs this several times, though? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. well, it's difficult to find the right value. Maybe set it to 25, so we have some more head-room. |
||
|
||
if err != nil { | ||
t.Error(err) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
type mockPublishFn func(publisher.Batch) error | ||
|
||
func newMockClient(publishFn mockPublishFn) outputs.Client { | ||
return &mockClient{publishFn: publishFn} | ||
} | ||
|
||
type mockClient struct { | ||
publishFn mockPublishFn | ||
} | ||
|
||
func (c *mockClient) String() string { return "mock_client" } | ||
func (c *mockClient) Close() error { return nil } | ||
func (c *mockClient) Publish(batch publisher.Batch) error { | ||
return c.publishFn(batch) | ||
} | ||
|
||
func newMockNetworkClient(publishFn mockPublishFn) outputs.Client { | ||
return &mockNetworkClient{newMockClient(publishFn)} | ||
} | ||
|
||
type mockNetworkClient struct { | ||
outputs.Client | ||
} | ||
|
||
func (c *mockNetworkClient) Connect() error { return nil } | ||
|
||
type mockQueue struct{} | ||
|
||
func (q mockQueue) Close() error { return nil } | ||
func (q mockQueue) BufferConfig() queue.BufferConfig { return queue.BufferConfig{} } | ||
func (q mockQueue) Producer(cfg queue.ProducerConfig) queue.Producer { return mockProducer{} } | ||
func (q mockQueue) Consumer() queue.Consumer { return mockConsumer{} } | ||
|
||
type mockProducer struct{} | ||
|
||
func (p mockProducer) Publish(event publisher.Event) bool { return true } | ||
func (p mockProducer) TryPublish(event publisher.Event) bool { return true } | ||
func (p mockProducer) Cancel() int { return 0 } | ||
|
||
type mockConsumer struct{} | ||
|
||
func (c mockConsumer) Get(eventCount int) (queue.Batch, error) { return &Batch{}, nil } | ||
func (c mockConsumer) Close() error { return nil } | ||
|
||
func randomBatch(min, max int, wqu workQueue) *Batch { | ||
numEvents := randIntBetween(min, max) | ||
events := make([]publisher.Event, numEvents) | ||
|
||
consumer := newEventConsumer(logp.L(), mockQueue{}, &batchContext{}) | ||
retryer := newRetryer(logp.L(), nilObserver, wqu, consumer) | ||
|
||
batch := Batch{ | ||
events: events, | ||
ctx: &batchContext{ | ||
observer: nilObserver, | ||
retryer: retryer, | ||
}, | ||
} | ||
|
||
return &batch | ||
} | ||
|
||
// randIntBetween returns a random integer in [min, max) | ||
func randIntBetween(min, max int) int { | ||
return rand.Intn(max-min) + min | ||
} | ||
|
||
func seedPRNG(t *testing.T) { | ||
seed := *SeedFlag | ||
if seed == 0 { | ||
seed = time.Now().UnixNano() | ||
} | ||
|
||
t.Logf("reproduce test with `go test ... -seed %v`", seed) | ||
rand.Seed(seed) | ||
} | ||
|
||
func waitUntilTrue(duration time.Duration, fn func() bool) bool { | ||
end := time.Now().Add(duration) | ||
for time.Now().Before(end) { | ||
if fn() { | ||
return true | ||
} | ||
time.Sleep(1 * time.Millisecond) | ||
} | ||
return false | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here we might have a race. E.g. how can you tell not all batches have been consumed? Does the initial worker 'block' without ACKing the batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial worker does not block without ACKing the batch. Indeed, this is probably why I'm seeing different results locally vs. in Travis CI environment.