Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

integration.testDLQFromCassandra: Fix flakiness, speedup by 4x #168

Merged
merged 2 commits into from
Apr 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions services/controllerhost/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ const (
const resultCacheRefreshMaxWaitTime = int64(500 * time.Millisecond)

// how long to wait for an input host to respond to a drain command
const drainExtentTimeout = time.Minute
var drainExtentTimeout = int64(time.Minute)

var (
sealExtentInitialCallTimeout = 2 * time.Second
Expand Down Expand Up @@ -279,6 +279,18 @@ func NewStoreHostFailedEvent(hostUUID string) Event {
}
}

// SetDrainExtentTimeout overrides the drain extent
// timeout to the given value. This method is intended
// only for unit test
func SetDrainExtentTimeout(timeout time.Duration) {
atomic.StoreInt64(&drainExtentTimeout, int64(timeout))
}

// GetDrainExtentTimeout returns the current drainExtentTimeout
func GetDrainExtentTimeout() time.Duration {
return time.Duration(atomic.LoadInt64(&drainExtentTimeout))
}

// Handle handles the creation of a new extent.
// Following are the async actions to be triggered on creation of an extent:
// a. For every input host that serves a open extent for the DST
Expand Down Expand Up @@ -1150,7 +1162,7 @@ func drainExtent(context *Context, dstID string, extentID string, inputID string
`reconfigID`: drainReq.GetUpdateUUID(),
}).Info(`sending drain command to input host`)

ctx, cancel := thrift.NewContext(drainExtentTimeout)
ctx, cancel := thrift.NewContext(GetDrainExtentTimeout())
if err = adminClient.DrainExtent(ctx, drainReq); err != nil {
context.m3Client.IncCounter(m3Scope, metrics.ControllerErrDrainFailed)
}
Expand Down
1 change: 1 addition & 0 deletions test/integration/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) {

// Adjust the controller and storehost scan intervals
controllerhost.IntervalBtwnScans = time.Second
controllerhost.SetDrainExtentTimeout(5 * time.Second)
storehost.ExtStatsReporterSetReportInterval(time.Second)
storehost.ExtStatsReporterResume()

Expand Down
75 changes: 47 additions & 28 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,19 +1015,17 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() {
destPath = `/test.runner.SmartRetry/TestDLQWithCassandra` // This path ensures that throttling is limited for this test
cgPath = `/test.runner.SmartRetry/TestDLQWithCassandraCG`
cgMaxDeliveryCount = 2
cgLockTimeout = 5
cgReadLoopTimeout = time.Minute / 2
cgLockTimeout = 1
cgExpectedDeliveryCount = cgMaxDeliveryCount + 1
cgVerifyLoopTimeout = time.Minute * 2
cgVerifyLoopTicker = cgLockTimeout * time.Second
cgVerifyLoopTicker = (cgLockTimeout * time.Second) / 2
cnsmrPrefetch = 10
publisherPubInterval = time.Second / 5
DLQPublishClearTime = cgLockTimeout * time.Second * 2
publisherPubInterval = 150

DLQMergeMessageTargetCount = 10
DLQPurgeMessageTargetCount = 10
DLQMessageStart = 10
DLQMessageSpacing = 4
DLQMessageStart = 5
DLQMessageSpacing = 2

// DLQ Delivery map special values
/* >0 = regular delivery count */
Expand Down Expand Up @@ -1102,7 +1100,9 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() {
// Publish messages continuously in a goroutine;
// This ensures a steady supply of 'good' messages so that smart retry will not affect us
closeCh := make(chan struct{})
publisherCloseCh := make(chan struct{})
defer close(closeCh)

go func() {
i := 0
defer publisherTest.Close()
Expand All @@ -1118,6 +1118,8 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() {
i++
case <-closeCh:
return
case <-publisherCloseCh:
return
}
}
}()
Expand Down Expand Up @@ -1199,7 +1201,7 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() {
return ret
}

dlqConsumerTest := func() {
dlqConsumerTest := func(expected map[int]struct{}) {
// Create DLQ consumer group
cgReq.ConsumerGroupName = common.StringPtr(cgReq.GetConsumerGroupName() + `_DLQ`)
cgReq.DestinationPath = common.StringPtr(cgDesc.GetDeadLetterQueueDestinationUUID())
Expand All @@ -1218,16 +1220,19 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() {

// Open the consumer channel
dlqDelivery := make(chan client.Delivery, 1)
dlqDelivery, errdlq = DLQConsumerTest.Open(delivery)
dlqDelivery, errdlq = DLQConsumerTest.Open(dlqDelivery)
s.NoError(errdlq)

// Verify that we can get at least one message off the DLQ
select {
case msg := <-dlqDelivery:
s.NotNil(msg)
msg.Ack()
case <-time.After(time.Minute):
s.Fail(`DLQ Consumer Group delivery should not time out`)
for len(expected) > 0 {
select {
case msg := <-dlqDelivery:
s.NotNil(msg)
msg.Ack()
msgID, _ := strconv.Atoi(string(msg.GetMessage().GetPayload().GetData()[4:]))
delete(expected, msgID)
case <-time.After(time.Minute):
s.Fail(`DLQ Consumer Group timed out before receiving all messages`)
}
}

// Verify that we can delete a DLQ consumer group
Expand Down Expand Up @@ -1310,6 +1315,18 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() {

}()

msgsExpectedInDLQ := func() map[int]struct{} {
dlqMutex.Lock()
defer dlqMutex.Unlock()
result := make(map[int]struct{})
for id, v := range dlqDeliveryMap {
if v >= cgExpectedDeliveryCount {
result[id] = struct{}{}
}
}
return result
}

operationsLoop:
for {

Expand All @@ -1325,16 +1342,17 @@ operationsLoop:
p := <-phaseCh
switch p {
case purgeOp:
dlqConsumerTest()
time.Sleep(DLQPublishClearTime)

dlqConsumerTest(msgsExpectedInDLQ())

// Purge DLQ
err = fe.PurgeDLQForConsumerGroup(nil, purgeReq)

// Verify that repeating the request succeeds
err = fe.PurgeDLQForConsumerGroup(nil, purgeReq)
s.NoError(err)

// Verify that immediately issuing a purge request fails
// Verify that immediately issuing a merge request fails
// Note that this test could fail if the controller has somehow finished processing the above merge already (race condition)

err = fe.MergeDLQForConsumerGroup(nil, mergeReq)
Expand All @@ -1355,26 +1373,25 @@ operationsLoop:
// Wait for operation to complete
lll().Info(`Waiting for purge operation to complete...`)
waitTime := time.Now()
purgeWait:
for {
cond := func() bool {
dlqDestDesc, err = s.mClient.ReadDestination(nil, dReq)
s.Nil(err)
s.NotNil(dlqDestDesc)
if dlqDestDesc.DLQPurgeBefore == nil {
panic(`foo`)
}
if dlqDestDesc.GetDLQPurgeBefore() == 0 {
break purgeWait
}
time.Sleep(time.Second)
return dlqDestDesc.GetDLQPurgeBefore() == 0
}

succ := common.SpinWaitOnCondition(cond, time.Minute)
s.True(succ, "dlq purge operation timed out")

dlqMutex.Lock()
ll().Infof(`Performed purge, waited %v for purge to clear`, time.Since(waitTime))
dlqMutex.Unlock()

case mergeOp:
time.Sleep(DLQPublishClearTime)
dlqConsumerTest(msgsExpectedInDLQ())
// Merge DLQ
err = fe.MergeDLQForConsumerGroup(nil, mergeReq)
s.NoError(err)
Expand All @@ -1392,7 +1409,7 @@ operationsLoop:
dlqMutex.Lock()
ll().Infof(`Performed merge`)

// Mark all messages that should be in DLQ as purged
// Mark all messages that should be in DLQ as merged
for id, v := range dlqDeliveryMap {
if v >= cgExpectedDeliveryCount {
dlqDeliveryMap[id] = merged
Expand All @@ -1401,6 +1418,9 @@ operationsLoop:

dlqMutex.Unlock()

// close the publisher, we no longer need it
publisherCloseCh <- struct{}{}

case done:

verifyTimeout := time.NewTimer(cgVerifyLoopTimeout)
Expand Down Expand Up @@ -1499,7 +1519,6 @@ func (s *NetIntegrationSuiteParallelD) TestSmartRetryDisableDuringDLQMerge() {
// ll - local log
ll := func(fmtS string, rest ...interface{}) {
common.GetDefaultLogger().WithFields(bark.Fields{`phase`: phase}).Infof(fmtS, rest...)
//fmt.Printf(`p`+strconv.Itoa(phase)+` `+fmtS+"\n", rest...)
}

// lll - local log with lock (for race on access to phase)
Expand Down