From d21c480d67318798584c0171b7c23f905bc85188 Mon Sep 17 00:00:00 2001 From: Venkat Date: Thu, 20 Apr 2017 15:43:45 -0700 Subject: [PATCH] Trigger ExtentDownEvent when a store is in GoingDown state (#148) --- services/controllerhost/api_handlers.go | 43 ++++++------- .../controllerhost/controllerhost_test.go | 16 ++++- services/controllerhost/dfdd.go | 33 ++++++++++ services/controllerhost/dfdd_test.go | 64 +++++++++++++++++-- services/controllerhost/extentmon_test.go | 12 ++++ 5 files changed, 140 insertions(+), 28 deletions(-) diff --git a/services/controllerhost/api_handlers.go b/services/controllerhost/api_handlers.go index 6c41dfc9..e091b5d7 100644 --- a/services/controllerhost/api_handlers.go +++ b/services/controllerhost/api_handlers.go @@ -89,26 +89,18 @@ func isUUIDLengthValid(uuid string) bool { } func isInputHealthy(context *Context, extent *m.DestinationExtent) bool { - + inputID := extent.GetInputHostUUID() // if this is a Kafka phantom extent, then assume "input" is healthy if common.IsKafkaPhantomInput(extent.GetInputHostUUID()) { return true } - - return context.rpm.IsHostHealthy(common.InputServiceName, extent.GetInputHostUUID()) + return context.rpm.IsHostHealthy(common.InputServiceName, inputID) } func isExtentBeingSealed(context *Context, extentID string) bool { return context.extentSeals.inProgress.Contains(extentID) || context.extentSeals.failed.Contains(extentID) } -// isInputGoingDown returns true if the specified input host -// is going down for planned maintenance or deployment -func isInputGoingDown(context *Context, hostID string) bool { - state, _ := context.failureDetector.GetHostState(common.InputServiceName, hostID) - return state == dfddHostStateGoingDown -} - func getLockTimeout(result *resultCacheReadResult) time.Duration { if len(result.cachedResult) < 1 { return time.Second @@ -117,13 +109,11 @@ func getLockTimeout(result *resultCacheReadResult) time.Duration { } func isAnyStoreHealthy(context *Context, storeIDs []string) bool { - // special-case Kafka phantom extents that do not really have a physical // store (in Cherami) and use a placeholder 'phantom' store instead. if common.AreKafkaPhantomStores(storeIDs) { return true } - for _, id := range storeIDs { if context.rpm.IsHostHealthy(common.StoreServiceName, id) { return true @@ -143,11 +133,14 @@ func areExtentStoresHealthy(context *Context, extent *m.DestinationExtent) bool } for _, h := range storeIDs { - if !context.rpm.IsHostHealthy(common.StoreServiceName, h) { + isDown := !context.rpm.IsHostHealthy(common.StoreServiceName, h) + isGoingDown := !isDown && isDfddHostStatusGoingDown(context.failureDetector, common.StoreServiceName, h) + if isDown || isGoingDown { context.log.WithFields(bark.Fields{ - common.TagExt: common.FmtExt(extent.GetExtentUUID()), - common.TagStor: common.FmtStor(h), - }).Info("Found unhealthy extent, store unhealthy") + common.TagExt: common.FmtExt(extent.GetExtentUUID()), + common.TagStor: common.FmtStor(h), + "isHostGoingDown": isGoingDown, + }).Info("found extent with unhealthy store") return false } } @@ -297,21 +290,25 @@ func minOpenExtentsForDst(context *Context, dstPath string, dstType dstType) int } func getInputAddrIfExtentIsWritable(context *Context, extent *m.DestinationExtent, m3Scope int) (string, error) { - inputhost, err := context.rpm.ResolveUUID(common.InputServiceName, extent.GetInputHostUUID()) - if err != nil { + + isGoingDown := isDfddHostStatusGoingDown(context.failureDetector, common.InputServiceName, extent.GetInputHostUUID()) + if isGoingDown { context.log. WithField(common.TagExt, common.FmtExt(extent.GetExtentUUID())). WithField(common.TagIn, common.FmtIn(extent.GetInputHostUUID())). - Info("Found unhealthy extent, input unhealthy") - return "", err + Info("input host is going down in dfdd, treating extent as unwritable") + return "", errNoInputHosts } - if isInputGoingDown(context, extent.GetInputHostUUID()) { + + inputhost, err := context.rpm.ResolveUUID(common.InputServiceName, extent.GetInputHostUUID()) + if err != nil { context.log. WithField(common.TagExt, common.FmtExt(extent.GetExtentUUID())). WithField(common.TagIn, common.FmtIn(extent.GetInputHostUUID())). - Info("input host is in going down state, treating extent as unwritable") - return "", errNoInputHosts + Info("found unhealthy extent, cannot resolve input uuid") + return "", err } + if !areExtentStoresHealthy(context, extent) { return "", errNoStoreHosts } diff --git a/services/controllerhost/controllerhost_test.go b/services/controllerhost/controllerhost_test.go index 15f79801..973ac577 100644 --- a/services/controllerhost/controllerhost_test.go +++ b/services/controllerhost/controllerhost_test.go @@ -64,6 +64,7 @@ type McpSuite struct { inputPort int outputPort int storePort int + dfdd *dfddImpl mClient m.TChanMetadataService mockReplicator *mockreplicator.MockTChanReplicator } @@ -121,7 +122,20 @@ func (s *McpSuite) startController() { context.channel = s.mcp.GetTChannel() context.eventPipeline = NewEventPipeline(context, nEventPipelineWorkers) context.eventPipeline.Start() - context.failureDetector = NewDfdd(s.mcp.context, common.NewRealTimeSource()) + s.dfdd = NewDfdd(s.mcp.context, common.NewRealTimeSource()).(*dfddImpl) + context.failureDetector = s.dfdd + + hosts, _ := s.mockrpm.GetHosts(common.InputServiceName) + for _, h := range hosts { + event := &common.RingpopListenerEvent{Key: h.UUID, Type: common.HostAddedEvent} + s.dfdd.handleHostAddedEvent(common.InputServiceName, event) + } + hosts, _ = s.mockrpm.GetHosts(common.StoreServiceName) + for _, h := range hosts { + event := &common.RingpopListenerEvent{Key: h.UUID, Type: common.HostAddedEvent} + s.dfdd.handleHostAddedEvent(common.StoreServiceName, event) + } + s.mcp.started = 1 } diff --git a/services/controllerhost/dfdd.go b/services/controllerhost/dfdd.go index 4f798c9b..210b2ded 100644 --- a/services/controllerhost/dfdd.go +++ b/services/controllerhost/dfdd.go @@ -93,8 +93,17 @@ const ( dfddHostStateGoingDown dfddHostStateDown dfddHostStateForgotten + numDfddStates // must be the last member ) +var dfddStateNames = [numDfddStates]string{ + "unknown", + "up", + "goingDown", + "down", + "forgotten", +} + // state that represents that the host is about to // go down for planned deployment or maintenance const hostGoingDownEvent common.RingpopEventType = 99 @@ -307,6 +316,18 @@ func (dfdd *dfddImpl) handleHostGoingDownEvent(service string, event *common.Rin copy := deepCopyMap(hosts) copy[event.Key] = newDFDDHost(dfddHostStateGoingDown, dfdd.timeSource) dfdd.putHosts(service, copy) + + if service == common.StoreServiceName { + // When a store host is about to go down for deployment, + // we need to trigger draining of every OPEN extent. However, + // the store is still not *considered* down until its down + // in ringpop. Ideally, we need a StoreGoingDownEvent, but + // since the behavior is going to be the same as StoreFailedEvent, + // we simply enqueue a storeFailedEvent + if !dfdd.context.eventPipeline.Add(NewStoreHostFailedEvent(event.Key)) { + dfdd.context.log.WithField(common.TagEvent, event).Error("failed to enqueue event after store reported as GoingDown") + } + } } func (dfdd *dfddImpl) handleListenerEvent(service string, event *common.RingpopListenerEvent) { @@ -342,6 +363,11 @@ func (dfdd *dfddImpl) putHosts(service string, hosts map[string]dfddHost) { } } +func isDfddHostStatusGoingDown(dfdd Dfdd, service string, hostID string) bool { + state, _ := dfdd.GetHostState(service, hostID) + return state == dfddHostStateGoingDown +} + func newDFDDHost(state dfddHostState, timeSource common.TimeSource) dfddHost { return dfddHost{ state: state, @@ -349,6 +375,13 @@ func newDFDDHost(state dfddHostState, timeSource common.TimeSource) dfddHost { } } +func (state dfddHostState) String() string { + if state < 0 || state >= numDfddStates { + return "invalid" + } + return dfddStateNames[state] +} + // creates a new map and copies the key/values from the // given map into the new map. The capacity of the new // map will the max(hostMapInitialCapacity, len(src)) diff --git a/services/controllerhost/dfdd_test.go b/services/controllerhost/dfdd_test.go index 3267208b..7e8ca167 100644 --- a/services/controllerhost/dfdd_test.go +++ b/services/controllerhost/dfdd_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/suite" "github.com/uber-common/bark" "github.com/uber/cherami-server/common" + m "github.com/uber/cherami-thrift/.generated/go/metadata" ) type ( @@ -74,9 +75,13 @@ func (s *DfddTestSuite) TestFailureDetection() { s.rpm.NotifyListeners(common.InputServiceName, h, common.HostAddedEvent) s.rpm.NotifyListeners(common.InputServiceName, h, common.HostRemovedEvent) } - for _, h := range storeIDs { + for i, h := range storeIDs { s.rpm.NotifyListeners(common.StoreServiceName, h, common.HostAddedEvent) - s.rpm.NotifyListeners(common.StoreServiceName, h, common.HostRemovedEvent) + if i == len(storeIDs)-1 { + s.dfdd.ReportHostGoingDown(common.StoreServiceName, h) + } else { + s.rpm.NotifyListeners(common.StoreServiceName, h, common.HostRemovedEvent) + } } cond := func() bool { @@ -88,6 +93,15 @@ func (s *DfddTestSuite) TestFailureDetection() { s.True(succ, "Dfdd failed to detect failure within timeout") s.Equal(0, s.eventPipeline.numStoreRemoteExtentReplicatorDownEvents(), "unexpected events generated") + // async, not guaranteed to change state immediately + s.rpm.NotifyListeners(common.StoreServiceName, storeIDs[2], common.HostRemovedEvent) + cond = func() bool { + s, _ := s.dfdd.GetHostState(common.StoreServiceName, storeIDs[2]) + return s == dfddHostStateDown + } + succ = common.SpinWaitOnCondition(cond, 10*time.Second) + s.True(succ, "dfdd failed to mark host as down") + for _, h := range inHostIDs { state, _ := s.dfdd.GetHostState(common.InputServiceName, h) s.True(s.eventPipeline.isInputHostFailed(h), "Dfdd failed to detect in host failure") @@ -108,7 +122,6 @@ func (s *DfddTestSuite) TestFailureDetection() { succ := common.SpinWaitOnCondition(cond, time.Second*10) s.True(succ, "dfdd failed to remove store host entry after downToForgottenDuration") } - // now test hostGoingDown state s.rpm.NotifyListeners(common.InputServiceName, inHostIDs[0], common.HostAddedEvent) s.rpm.NotifyListeners(common.StoreServiceName, storeIDs[0], common.HostAddedEvent) @@ -129,13 +142,56 @@ func (s *DfddTestSuite) TestFailureDetection() { s.True(succ, "dfdd failed to discover new hosts") s.context.failureDetector = s.dfdd - succ = isInputGoingDown(s.context, inHostIDs[0]) + succ = isDfddHostStatusGoingDown(s.context.failureDetector, common.InputServiceName, inHostIDs[0]) s.True(succ, "isInputGoingDown() failed") + succ = isDfddHostStatusGoingDown(s.context.failureDetector, common.StoreServiceName, storeIDs[0]) + s.True(succ, "isStoreGoingDown() failed") + + dstExtent := &m.DestinationExtent{ + ExtentUUID: common.StringPtr(uuid.New()), + StoreUUIDs: []string{storeIDs[0]}, + } + + succ = areExtentStoresHealthy(s.context, dstExtent) + s.False(succ, "areExtentStoresHealthy check failed") + + dstExtent.StoreUUIDs = []string{storeIDs[1]} + succ = areExtentStoresHealthy(s.context, dstExtent) + s.False(succ, "areExtentStoresHealthy check failed") + s.dfdd.Stop() stateMachineTickerInterval = oldStateMachineInterval } +func (s *DfddTestSuite) TestDfddStateToString() { + states := []dfddHostState{ + dfddHostStateUnknown, + dfddHostStateUP, + dfddHostStateGoingDown, + dfddHostStateDown, + dfddHostStateForgotten, + dfddHostState(-1), + dfddHostState(128), + } + for _, st := range states { + switch st { + case dfddHostStateUnknown: + s.Equal("unknown", st.String()) + case dfddHostStateUP: + s.Equal("up", st.String()) + case dfddHostStateGoingDown: + s.Equal("goingDown", st.String()) + case dfddHostStateDown: + s.Equal("down", st.String()) + case dfddHostStateForgotten: + s.Equal("forgotten", st.String()) + default: + s.Equal("invalid", st.String()) + } + } +} + type testEventPipelineImpl struct { sync.Mutex nInputHostFailedEvents int diff --git a/services/controllerhost/extentmon_test.go b/services/controllerhost/extentmon_test.go index 7de81d13..0806d1b5 100644 --- a/services/controllerhost/extentmon_test.go +++ b/services/controllerhost/extentmon_test.go @@ -102,6 +102,7 @@ func (s *ExtentStateMonitorSuite) SetupTest() { s.mcp.context.eventPipeline = NewEventPipeline(s.mcp.context, 2) s.mcp.context.eventPipeline.Start() s.mcp.context.extentMonitor = newExtentStateMonitor(s.mcp.context) + s.mcp.context.failureDetector = NewDfdd(s.mcp.context, common.NewRealTimeSource()) s.mcp.context.m3Client = metrics.NewClient(common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig()), metrics.Controller) } @@ -316,12 +317,18 @@ func (s *ExtentStateMonitorSuite) TestExtentMonitor() { s.mClient.DeleteDestination(nil, &shared.DeleteDestinationRequest{Path: common.StringPtr(destinations[1].GetPath())}) rpm := common.NewMockRingpopMonitor() + dfdd := s.mcp.context.failureDetector.(*dfddImpl) stores := make([]*MockStoreService, len(storeIDs)) for i := 0; i < len(storeIDs)-1; i++ { stores[i] = NewMockStoreService() stores[i].Start() rpm.Add(common.StoreServiceName, storeIDs[i], stores[i].hostPort) + event := &common.RingpopListenerEvent{ + Key: storeIDs[i], + Type: common.HostAddedEvent, + } + dfdd.handleHostAddedEvent(common.StoreServiceName, event) } inputService := NewMockInputOutputService("inputhost") @@ -329,6 +336,11 @@ func (s *ExtentStateMonitorSuite) TestExtentMonitor() { inputService.Start(common.InputServiceName, thriftService) rpm.Add(common.InputServiceName, inHostIDs[0], inputService.hostPort) + event := &common.RingpopListenerEvent{ + Key: inHostIDs[0], + Type: common.HostAddedEvent, + } + dfdd.handleHostAddedEvent(common.InputServiceName, event) outputService := NewMockInputOutputService("outputhost") thriftService = admin.NewTChanOutputHostAdminServer(outputService)