From b1fcd3ea29244b255efcd69ee6d8e2110b8d9ebe Mon Sep 17 00:00:00 2001 From: Venkatraghavan Srinivasan Date: Mon, 24 Apr 2017 12:32:58 -0700 Subject: [PATCH 1/2] integration.testDLQFromCassandra: Fix flakiness, speedup by 4x --- services/controllerhost/event_handlers.go | 16 ++++- test/integration/base.go | 1 + test/integration/integration_test.go | 75 ++++++++++++++--------- 3 files changed, 62 insertions(+), 30 deletions(-) diff --git a/services/controllerhost/event_handlers.go b/services/controllerhost/event_handlers.go index 2b820bac..814ddf58 100644 --- a/services/controllerhost/event_handlers.go +++ b/services/controllerhost/event_handlers.go @@ -162,7 +162,7 @@ const ( const resultCacheRefreshMaxWaitTime = int64(500 * time.Millisecond) // how long to wait for an input host to respond to a drain command -const drainExtentTimeout = time.Minute +var drainExtentTimeout = int64(time.Minute) var ( sealExtentInitialCallTimeout = 2 * time.Second @@ -279,6 +279,18 @@ func NewStoreHostFailedEvent(hostUUID string) Event { } } +// SetDrainExtentTimeout overrides the drain extent +// timeout to the given value. This method is intended +// only for unit test +func SetDrainExtentTimeout(timeout time.Duration) { + atomic.StoreInt64(&drainExtentTimeout, int64(timeout)) +} + +// GetDrainExtentTimeout returns the current drainExtentTimeout +func GetDrainExtentTimeout() time.Duration { + return time.Duration(atomic.LoadInt64(&drainExtentTimeout)) +} + // Handle handles the creation of a new extent. // Following are the async actions to be triggered on creation of an extent: // a. For every input host that serves a open extent for the DST @@ -1150,7 +1162,7 @@ func drainExtent(context *Context, dstID string, extentID string, inputID string `reconfigID`: drainReq.GetUpdateUUID(), }).Info(`sending drain command to input host`) - ctx, cancel := thrift.NewContext(drainExtentTimeout) + ctx, cancel := thrift.NewContext(GetDrainExtentTimeout()) if err = adminClient.DrainExtent(ctx, drainReq); err != nil { context.m3Client.IncCounter(m3Scope, metrics.ControllerErrDrainFailed) } diff --git a/test/integration/base.go b/test/integration/base.go index 66f63af1..c6085576 100644 --- a/test/integration/base.go +++ b/test/integration/base.go @@ -160,6 +160,7 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) { // Adjust the controller and storehost scan intervals controllerhost.IntervalBtwnScans = time.Second + controllerhost.SetDrainExtentTimeout(5*time.Second) storehost.ExtStatsReporterSetReportInterval(time.Second) storehost.ExtStatsReporterResume() diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index cb387b6e..f0beea47 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -1015,19 +1015,17 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() { destPath = `/test.runner.SmartRetry/TestDLQWithCassandra` // This path ensures that throttling is limited for this test cgPath = `/test.runner.SmartRetry/TestDLQWithCassandraCG` cgMaxDeliveryCount = 2 - cgLockTimeout = 5 - cgReadLoopTimeout = time.Minute / 2 + cgLockTimeout = 1 cgExpectedDeliveryCount = cgMaxDeliveryCount + 1 cgVerifyLoopTimeout = time.Minute * 2 - cgVerifyLoopTicker = cgLockTimeout * time.Second + cgVerifyLoopTicker = (cgLockTimeout * time.Second)/2 cnsmrPrefetch = 10 - publisherPubInterval = time.Second / 5 - DLQPublishClearTime = cgLockTimeout * time.Second * 2 + publisherPubInterval = 150 DLQMergeMessageTargetCount = 10 DLQPurgeMessageTargetCount = 10 - DLQMessageStart = 10 - DLQMessageSpacing = 4 + DLQMessageStart = 5 + DLQMessageSpacing = 2 // DLQ Delivery map special values /* >0 = regular delivery count */ @@ -1102,7 +1100,9 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() { // Publish messages continuously in a goroutine; // This ensures a steady supply of 'good' messages so that smart retry will not affect us closeCh := make(chan struct{}) + publisherCloseCh := make(chan struct{}) defer close(closeCh) + go func() { i := 0 defer publisherTest.Close() @@ -1118,6 +1118,8 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() { i++ case <-closeCh: return + case <-publisherCloseCh: + return } } }() @@ -1199,7 +1201,7 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() { return ret } - dlqConsumerTest := func() { + dlqConsumerTest := func(expected map[int]struct{}) { // Create DLQ consumer group cgReq.ConsumerGroupName = common.StringPtr(cgReq.GetConsumerGroupName() + `_DLQ`) cgReq.DestinationPath = common.StringPtr(cgDesc.GetDeadLetterQueueDestinationUUID()) @@ -1218,16 +1220,19 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() { // Open the consumer channel dlqDelivery := make(chan client.Delivery, 1) - dlqDelivery, errdlq = DLQConsumerTest.Open(delivery) + dlqDelivery, errdlq = DLQConsumerTest.Open(dlqDelivery) s.NoError(errdlq) - // Verify that we can get at least one message off the DLQ - select { - case msg := <-dlqDelivery: - s.NotNil(msg) - msg.Ack() - case <-time.After(time.Minute): - s.Fail(`DLQ Consumer Group delivery should not time out`) + for len(expected) > 0 { + select { + case msg := <-dlqDelivery: + s.NotNil(msg) + msg.Ack() + msgID, _ := strconv.Atoi(string(msg.GetMessage().GetPayload().GetData()[4:])) + delete(expected, msgID) + case <-time.After(time.Minute): + s.Fail(`DLQ Consumer Group timed out before receiving all messages`) + } } // Verify that we can delete a DLQ consumer group @@ -1310,6 +1315,18 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() { }() + msgsExpectedInDLQ := func() map[int]struct{} { + dlqMutex.Lock() + defer dlqMutex.Unlock() + result := make(map[int]struct{}) + for id, v := range dlqDeliveryMap { + if v >= cgExpectedDeliveryCount { + result[id] = struct{}{} + } + } + return result + } + operationsLoop: for { @@ -1325,8 +1342,9 @@ operationsLoop: p := <-phaseCh switch p { case purgeOp: - dlqConsumerTest() - time.Sleep(DLQPublishClearTime) + + dlqConsumerTest(msgsExpectedInDLQ()) + // Purge DLQ err = fe.PurgeDLQForConsumerGroup(nil, purgeReq) @@ -1334,7 +1352,7 @@ operationsLoop: err = fe.PurgeDLQForConsumerGroup(nil, purgeReq) s.NoError(err) - // Verify that immediately issuing a purge request fails + // Verify that immediately issuing a merge request fails // Note that this test could fail if the controller has somehow finished processing the above merge already (race condition) err = fe.MergeDLQForConsumerGroup(nil, mergeReq) @@ -1355,26 +1373,25 @@ operationsLoop: // Wait for operation to complete lll().Info(`Waiting for purge operation to complete...`) waitTime := time.Now() - purgeWait: - for { + cond := func() bool { dlqDestDesc, err = s.mClient.ReadDestination(nil, dReq) s.Nil(err) s.NotNil(dlqDestDesc) if dlqDestDesc.DLQPurgeBefore == nil { panic(`foo`) } - if dlqDestDesc.GetDLQPurgeBefore() == 0 { - break purgeWait - } - time.Sleep(time.Second) + return dlqDestDesc.GetDLQPurgeBefore() == 0 } + succ := common.SpinWaitOnCondition(cond, time.Minute) + s.True(succ, "dlq purge operation timed out") + dlqMutex.Lock() ll().Infof(`Performed purge, waited %v for purge to clear`, time.Since(waitTime)) dlqMutex.Unlock() case mergeOp: - time.Sleep(DLQPublishClearTime) + dlqConsumerTest(msgsExpectedInDLQ()) // Merge DLQ err = fe.MergeDLQForConsumerGroup(nil, mergeReq) s.NoError(err) @@ -1392,7 +1409,7 @@ operationsLoop: dlqMutex.Lock() ll().Infof(`Performed merge`) - // Mark all messages that should be in DLQ as purged + // Mark all messages that should be in DLQ as merged for id, v := range dlqDeliveryMap { if v >= cgExpectedDeliveryCount { dlqDeliveryMap[id] = merged @@ -1401,6 +1418,9 @@ operationsLoop: dlqMutex.Unlock() + // close the publisher, we no longer need it + publisherCloseCh <- struct{}{} + case done: verifyTimeout := time.NewTimer(cgVerifyLoopTimeout) @@ -1499,7 +1519,6 @@ func (s *NetIntegrationSuiteParallelD) TestSmartRetryDisableDuringDLQMerge() { // ll - local log ll := func(fmtS string, rest ...interface{}) { common.GetDefaultLogger().WithFields(bark.Fields{`phase`: phase}).Infof(fmtS, rest...) - //fmt.Printf(`p`+strconv.Itoa(phase)+` `+fmtS+"\n", rest...) } // lll - local log with lock (for race on access to phase) From 37ac60c743e7f6346da6d9d2363096da6cfeef46 Mon Sep 17 00:00:00 2001 From: Venkatraghavan Srinivasan Date: Mon, 24 Apr 2017 12:51:05 -0700 Subject: [PATCH 2/2] go fmt --- test/integration/base.go | 2 +- test/integration/integration_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/integration/base.go b/test/integration/base.go index c6085576..6b891b12 100644 --- a/test/integration/base.go +++ b/test/integration/base.go @@ -160,7 +160,7 @@ func (tb *testBase) setupSuiteImpl(t *testing.T) { // Adjust the controller and storehost scan intervals controllerhost.IntervalBtwnScans = time.Second - controllerhost.SetDrainExtentTimeout(5*time.Second) + controllerhost.SetDrainExtentTimeout(5 * time.Second) storehost.ExtStatsReporterSetReportInterval(time.Second) storehost.ExtStatsReporterResume() diff --git a/test/integration/integration_test.go b/test/integration/integration_test.go index f0beea47..c7e4581c 100644 --- a/test/integration/integration_test.go +++ b/test/integration/integration_test.go @@ -1018,7 +1018,7 @@ func (s *NetIntegrationSuiteParallelA) TestDLQWithCassandra() { cgLockTimeout = 1 cgExpectedDeliveryCount = cgMaxDeliveryCount + 1 cgVerifyLoopTimeout = time.Minute * 2 - cgVerifyLoopTicker = (cgLockTimeout * time.Second)/2 + cgVerifyLoopTicker = (cgLockTimeout * time.Second) / 2 cnsmrPrefetch = 10 publisherPubInterval = 150