Skip to content

Commit

Permalink
Emulate blocking + mockable publish behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Apr 7, 2020
1 parent be4bf09 commit 508b606
Showing 1 changed file with 45 additions and 42 deletions.
87 changes: 45 additions & 42 deletions libbeat/publisher/pipeline/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
)

func TestMakeClientWorker(t *testing.T) {
tests := map[string]func(uint) publishCountable{
tests := map[string]func(mockPublishFn) outputs.Client{
"client": newMockClient,
"network_client": newMockNetworkClient,
}
Expand All @@ -52,8 +52,14 @@ func TestMakeClientWorker(t *testing.T) {
err := quick.Check(func(i uint) bool {
numBatches := 300 + (i % 100) // between 300 and 399

var published atomic.Uint
publishFn := func(batch publisher.Batch) error {
published.Add(uint(len(batch.Events())))
return nil
}

wqu := makeWorkQueue()
client := ctor(0)
client := ctor(publishFn)
makeClientWorker(nilObserver, wqu, client)

numEvents := atomic.MakeUint(0)
Expand All @@ -68,7 +74,7 @@ func TestMakeClientWorker(t *testing.T) {

// Make sure that all events have eventually been published
return waitUntilTrue(timeout, func() bool {
return numEvents.Load() == client.Published()
return numEvents == published
})
}, nil)

Expand All @@ -80,7 +86,7 @@ func TestMakeClientWorker(t *testing.T) {
}

func TestMakeClientWorkerAndClose(t *testing.T) {
tests := map[string]func(uint) publishCountable{
tests := map[string]func(mockPublishFn) outputs.Client{
"client": newMockClient,
"network_client": newMockNetworkClient,
}
Expand Down Expand Up @@ -109,14 +115,29 @@ func TestMakeClientWorkerAndClose(t *testing.T) {
}()

// Publish at least 1 batch worth of events but no more than 20% events
publishLimit := uint(math.Min(minEventsInBatch, float64(numEvents.Load())*0.2))
client := ctor(publishLimit)
publishLimit := uint(math.Max(minEventsInBatch, float64(numEvents.Load())*0.2))

var publishedFirst atomic.Uint
blockCtrl := make(chan struct{})
blockingPublishFn := func(batch publisher.Batch) error {
// Emulate blocking
if publishedFirst.Load() >= publishLimit {
batch.Retry()
<-blockCtrl
return nil
}

publishedFirst.Add(uint(len(batch.Events())))
return nil
}

client := ctor(blockingPublishFn)
worker := makeClientWorker(nilObserver, wqu, client)

// Allow the worker to make *some* progress before we close it
timeout := 10 * time.Second
progress := waitUntilTrue(timeout, func() bool {
return client.Published() >= publishLimit
return publishedFirst.Load() >= publishLimit
})
if !progress {
return false
Expand All @@ -125,19 +146,23 @@ func TestMakeClientWorkerAndClose(t *testing.T) {
// Close worker before all batches have had time to be published
err := worker.Close()
require.NoError(t, err)

published := client.Published()
close(blockCtrl)

// Start new worker to drain work queue
client = ctor(0)
var publishedLater atomic.Uint
countingPublishFn := func(batch publisher.Batch) error {
publishedLater.Add(uint(len(batch.Events())))
return nil
}

client = ctor(countingPublishFn)
makeClientWorker(nilObserver, wqu, client)
wg.Wait()

// Make sure that all events have eventually been published
timeout = 20 * time.Second
return waitUntilTrue(timeout, func() bool {
total := published + client.Published()
return numEvents.Load() == total
return numEvents.Load() == publishedFirst.Load()+publishedLater.Load()
})
}, &quick.Config{MaxCount: 50})

Expand All @@ -148,50 +173,28 @@ func TestMakeClientWorkerAndClose(t *testing.T) {
}
}

type publishCountable interface {
outputs.Client
Published() uint
}
type mockPublishFn func(publisher.Batch) error

func newMockClient(publishLimit uint) publishCountable {
return &mockClient{publishLimit: publishLimit}
func newMockClient(publishFn mockPublishFn) outputs.Client {
return &mockClient{publishFn: publishFn}
}

type mockClient struct {
mu sync.RWMutex
publishLimit uint
published uint
}

func (c *mockClient) Published() uint {
c.mu.RLock()
defer c.mu.RUnlock()

return c.published
publishFn mockPublishFn
}

func (c *mockClient) String() string { return "mock_client" }
func (c *mockClient) Close() error { return nil }
func (c *mockClient) Publish(batch publisher.Batch) error {
c.mu.Lock()
defer c.mu.Unlock()

// Block publishing
if c.publishLimit > 0 && c.published >= c.publishLimit {
batch.Retry() // to simulate not acking
return nil
}

c.published += uint(len(batch.Events()))
return nil
return c.publishFn(batch)
}

func newMockNetworkClient(publishLimit uint) publishCountable {
return &mockNetworkClient{newMockClient(publishLimit)}
func newMockNetworkClient(publishFn mockPublishFn) outputs.Client {
return &mockNetworkClient{newMockClient(publishFn)}
}

type mockNetworkClient struct {
publishCountable
outputs.Client
}

func (c *mockNetworkClient) Connect() error { return nil }
Expand Down

0 comments on commit 508b606

Please sign in to comment.