Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Trigger ExtentDownEvent when a store is in GoingDown state (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Apr 20, 2017
1 parent c1a8a56 commit d21c480
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 28 deletions.
43 changes: 20 additions & 23 deletions services/controllerhost/api_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,18 @@ func isUUIDLengthValid(uuid string) bool {
}

func isInputHealthy(context *Context, extent *m.DestinationExtent) bool {

inputID := extent.GetInputHostUUID()
// if this is a Kafka phantom extent, then assume "input" is healthy
if common.IsKafkaPhantomInput(extent.GetInputHostUUID()) {
return true
}

return context.rpm.IsHostHealthy(common.InputServiceName, extent.GetInputHostUUID())
return context.rpm.IsHostHealthy(common.InputServiceName, inputID)
}

func isExtentBeingSealed(context *Context, extentID string) bool {
return context.extentSeals.inProgress.Contains(extentID) || context.extentSeals.failed.Contains(extentID)
}

// isInputGoingDown returns true if the specified input host
// is going down for planned maintenance or deployment
func isInputGoingDown(context *Context, hostID string) bool {
state, _ := context.failureDetector.GetHostState(common.InputServiceName, hostID)
return state == dfddHostStateGoingDown
}

func getLockTimeout(result *resultCacheReadResult) time.Duration {
if len(result.cachedResult) < 1 {
return time.Second
Expand All @@ -117,13 +109,11 @@ func getLockTimeout(result *resultCacheReadResult) time.Duration {
}

func isAnyStoreHealthy(context *Context, storeIDs []string) bool {

// special-case Kafka phantom extents that do not really have a physical
// store (in Cherami) and use a placeholder 'phantom' store instead.
if common.AreKafkaPhantomStores(storeIDs) {
return true
}

for _, id := range storeIDs {
if context.rpm.IsHostHealthy(common.StoreServiceName, id) {
return true
Expand All @@ -143,11 +133,14 @@ func areExtentStoresHealthy(context *Context, extent *m.DestinationExtent) bool
}

for _, h := range storeIDs {
if !context.rpm.IsHostHealthy(common.StoreServiceName, h) {
isDown := !context.rpm.IsHostHealthy(common.StoreServiceName, h)
isGoingDown := !isDown && isDfddHostStatusGoingDown(context.failureDetector, common.StoreServiceName, h)
if isDown || isGoingDown {
context.log.WithFields(bark.Fields{
common.TagExt: common.FmtExt(extent.GetExtentUUID()),
common.TagStor: common.FmtStor(h),
}).Info("Found unhealthy extent, store unhealthy")
common.TagExt: common.FmtExt(extent.GetExtentUUID()),
common.TagStor: common.FmtStor(h),
"isHostGoingDown": isGoingDown,
}).Info("found extent with unhealthy store")
return false
}
}
Expand Down Expand Up @@ -297,21 +290,25 @@ func minOpenExtentsForDst(context *Context, dstPath string, dstType dstType) int
}

func getInputAddrIfExtentIsWritable(context *Context, extent *m.DestinationExtent, m3Scope int) (string, error) {
inputhost, err := context.rpm.ResolveUUID(common.InputServiceName, extent.GetInputHostUUID())
if err != nil {

isGoingDown := isDfddHostStatusGoingDown(context.failureDetector, common.InputServiceName, extent.GetInputHostUUID())
if isGoingDown {
context.log.
WithField(common.TagExt, common.FmtExt(extent.GetExtentUUID())).
WithField(common.TagIn, common.FmtIn(extent.GetInputHostUUID())).
Info("Found unhealthy extent, input unhealthy")
return "", err
Info("input host is going down in dfdd, treating extent as unwritable")
return "", errNoInputHosts
}
if isInputGoingDown(context, extent.GetInputHostUUID()) {

inputhost, err := context.rpm.ResolveUUID(common.InputServiceName, extent.GetInputHostUUID())
if err != nil {
context.log.
WithField(common.TagExt, common.FmtExt(extent.GetExtentUUID())).
WithField(common.TagIn, common.FmtIn(extent.GetInputHostUUID())).
Info("input host is in going down state, treating extent as unwritable")
return "", errNoInputHosts
Info("found unhealthy extent, cannot resolve input uuid")
return "", err
}

if !areExtentStoresHealthy(context, extent) {
return "", errNoStoreHosts
}
Expand Down
16 changes: 15 additions & 1 deletion services/controllerhost/controllerhost_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type McpSuite struct {
inputPort int
outputPort int
storePort int
dfdd *dfddImpl
mClient m.TChanMetadataService
mockReplicator *mockreplicator.MockTChanReplicator
}
Expand Down Expand Up @@ -121,7 +122,20 @@ func (s *McpSuite) startController() {
context.channel = s.mcp.GetTChannel()
context.eventPipeline = NewEventPipeline(context, nEventPipelineWorkers)
context.eventPipeline.Start()
context.failureDetector = NewDfdd(s.mcp.context, common.NewRealTimeSource())
s.dfdd = NewDfdd(s.mcp.context, common.NewRealTimeSource()).(*dfddImpl)
context.failureDetector = s.dfdd

hosts, _ := s.mockrpm.GetHosts(common.InputServiceName)
for _, h := range hosts {
event := &common.RingpopListenerEvent{Key: h.UUID, Type: common.HostAddedEvent}
s.dfdd.handleHostAddedEvent(common.InputServiceName, event)
}
hosts, _ = s.mockrpm.GetHosts(common.StoreServiceName)
for _, h := range hosts {
event := &common.RingpopListenerEvent{Key: h.UUID, Type: common.HostAddedEvent}
s.dfdd.handleHostAddedEvent(common.StoreServiceName, event)
}

s.mcp.started = 1
}

Expand Down
33 changes: 33 additions & 0 deletions services/controllerhost/dfdd.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,17 @@ const (
dfddHostStateGoingDown
dfddHostStateDown
dfddHostStateForgotten
numDfddStates // must be the last member
)

var dfddStateNames = [numDfddStates]string{
"unknown",
"up",
"goingDown",
"down",
"forgotten",
}

// state that represents that the host is about to
// go down for planned deployment or maintenance
const hostGoingDownEvent common.RingpopEventType = 99
Expand Down Expand Up @@ -307,6 +316,18 @@ func (dfdd *dfddImpl) handleHostGoingDownEvent(service string, event *common.Rin
copy := deepCopyMap(hosts)
copy[event.Key] = newDFDDHost(dfddHostStateGoingDown, dfdd.timeSource)
dfdd.putHosts(service, copy)

if service == common.StoreServiceName {
// When a store host is about to go down for deployment,
// we need to trigger draining of every OPEN extent. However,
// the store is still not *considered* down until its down
// in ringpop. Ideally, we need a StoreGoingDownEvent, but
// since the behavior is going to be the same as StoreFailedEvent,
// we simply enqueue a storeFailedEvent
if !dfdd.context.eventPipeline.Add(NewStoreHostFailedEvent(event.Key)) {
dfdd.context.log.WithField(common.TagEvent, event).Error("failed to enqueue event after store reported as GoingDown")
}
}
}

func (dfdd *dfddImpl) handleListenerEvent(service string, event *common.RingpopListenerEvent) {
Expand Down Expand Up @@ -342,13 +363,25 @@ func (dfdd *dfddImpl) putHosts(service string, hosts map[string]dfddHost) {
}
}

func isDfddHostStatusGoingDown(dfdd Dfdd, service string, hostID string) bool {
state, _ := dfdd.GetHostState(service, hostID)
return state == dfddHostStateGoingDown
}

func newDFDDHost(state dfddHostState, timeSource common.TimeSource) dfddHost {
return dfddHost{
state: state,
lastStateChangeTime: timeSource.Now().UnixNano(),
}
}

func (state dfddHostState) String() string {
if state < 0 || state >= numDfddStates {
return "invalid"
}
return dfddStateNames[state]
}

// creates a new map and copies the key/values from the
// given map into the new map. The capacity of the new
// map will the max(hostMapInitialCapacity, len(src))
Expand Down
64 changes: 60 additions & 4 deletions services/controllerhost/dfdd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/uber-common/bark"
"github.com/uber/cherami-server/common"
m "github.com/uber/cherami-thrift/.generated/go/metadata"
)

type (
Expand Down Expand Up @@ -74,9 +75,13 @@ func (s *DfddTestSuite) TestFailureDetection() {
s.rpm.NotifyListeners(common.InputServiceName, h, common.HostAddedEvent)
s.rpm.NotifyListeners(common.InputServiceName, h, common.HostRemovedEvent)
}
for _, h := range storeIDs {
for i, h := range storeIDs {
s.rpm.NotifyListeners(common.StoreServiceName, h, common.HostAddedEvent)
s.rpm.NotifyListeners(common.StoreServiceName, h, common.HostRemovedEvent)
if i == len(storeIDs)-1 {
s.dfdd.ReportHostGoingDown(common.StoreServiceName, h)
} else {
s.rpm.NotifyListeners(common.StoreServiceName, h, common.HostRemovedEvent)
}
}

cond := func() bool {
Expand All @@ -88,6 +93,15 @@ func (s *DfddTestSuite) TestFailureDetection() {
s.True(succ, "Dfdd failed to detect failure within timeout")
s.Equal(0, s.eventPipeline.numStoreRemoteExtentReplicatorDownEvents(), "unexpected events generated")

// async, not guaranteed to change state immediately
s.rpm.NotifyListeners(common.StoreServiceName, storeIDs[2], common.HostRemovedEvent)
cond = func() bool {
s, _ := s.dfdd.GetHostState(common.StoreServiceName, storeIDs[2])
return s == dfddHostStateDown
}
succ = common.SpinWaitOnCondition(cond, 10*time.Second)
s.True(succ, "dfdd failed to mark host as down")

for _, h := range inHostIDs {
state, _ := s.dfdd.GetHostState(common.InputServiceName, h)
s.True(s.eventPipeline.isInputHostFailed(h), "Dfdd failed to detect in host failure")
Expand All @@ -108,7 +122,6 @@ func (s *DfddTestSuite) TestFailureDetection() {
succ := common.SpinWaitOnCondition(cond, time.Second*10)
s.True(succ, "dfdd failed to remove store host entry after downToForgottenDuration")
}

// now test hostGoingDown state
s.rpm.NotifyListeners(common.InputServiceName, inHostIDs[0], common.HostAddedEvent)
s.rpm.NotifyListeners(common.StoreServiceName, storeIDs[0], common.HostAddedEvent)
Expand All @@ -129,13 +142,56 @@ func (s *DfddTestSuite) TestFailureDetection() {
s.True(succ, "dfdd failed to discover new hosts")

s.context.failureDetector = s.dfdd
succ = isInputGoingDown(s.context, inHostIDs[0])
succ = isDfddHostStatusGoingDown(s.context.failureDetector, common.InputServiceName, inHostIDs[0])
s.True(succ, "isInputGoingDown() failed")

succ = isDfddHostStatusGoingDown(s.context.failureDetector, common.StoreServiceName, storeIDs[0])
s.True(succ, "isStoreGoingDown() failed")

dstExtent := &m.DestinationExtent{
ExtentUUID: common.StringPtr(uuid.New()),
StoreUUIDs: []string{storeIDs[0]},
}

succ = areExtentStoresHealthy(s.context, dstExtent)
s.False(succ, "areExtentStoresHealthy check failed")

dstExtent.StoreUUIDs = []string{storeIDs[1]}
succ = areExtentStoresHealthy(s.context, dstExtent)
s.False(succ, "areExtentStoresHealthy check failed")

s.dfdd.Stop()
stateMachineTickerInterval = oldStateMachineInterval
}

func (s *DfddTestSuite) TestDfddStateToString() {
states := []dfddHostState{
dfddHostStateUnknown,
dfddHostStateUP,
dfddHostStateGoingDown,
dfddHostStateDown,
dfddHostStateForgotten,
dfddHostState(-1),
dfddHostState(128),
}
for _, st := range states {
switch st {
case dfddHostStateUnknown:
s.Equal("unknown", st.String())
case dfddHostStateUP:
s.Equal("up", st.String())
case dfddHostStateGoingDown:
s.Equal("goingDown", st.String())
case dfddHostStateDown:
s.Equal("down", st.String())
case dfddHostStateForgotten:
s.Equal("forgotten", st.String())
default:
s.Equal("invalid", st.String())
}
}
}

type testEventPipelineImpl struct {
sync.Mutex
nInputHostFailedEvents int
Expand Down
12 changes: 12 additions & 0 deletions services/controllerhost/extentmon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *ExtentStateMonitorSuite) SetupTest() {
s.mcp.context.eventPipeline = NewEventPipeline(s.mcp.context, 2)
s.mcp.context.eventPipeline.Start()
s.mcp.context.extentMonitor = newExtentStateMonitor(s.mcp.context)
s.mcp.context.failureDetector = NewDfdd(s.mcp.context, common.NewRealTimeSource())
s.mcp.context.m3Client = metrics.NewClient(common.NewMetricReporterWithHostname(configure.NewCommonServiceConfig()), metrics.Controller)
}

Expand Down Expand Up @@ -316,19 +317,30 @@ func (s *ExtentStateMonitorSuite) TestExtentMonitor() {
s.mClient.DeleteDestination(nil, &shared.DeleteDestinationRequest{Path: common.StringPtr(destinations[1].GetPath())})

rpm := common.NewMockRingpopMonitor()
dfdd := s.mcp.context.failureDetector.(*dfddImpl)

stores := make([]*MockStoreService, len(storeIDs))
for i := 0; i < len(storeIDs)-1; i++ {
stores[i] = NewMockStoreService()
stores[i].Start()
rpm.Add(common.StoreServiceName, storeIDs[i], stores[i].hostPort)
event := &common.RingpopListenerEvent{
Key: storeIDs[i],
Type: common.HostAddedEvent,
}
dfdd.handleHostAddedEvent(common.StoreServiceName, event)
}

inputService := NewMockInputOutputService("inputhost")
thriftService := admin.NewTChanInputHostAdminServer(inputService)
inputService.Start(common.InputServiceName, thriftService)

rpm.Add(common.InputServiceName, inHostIDs[0], inputService.hostPort)
event := &common.RingpopListenerEvent{
Key: inHostIDs[0],
Type: common.HostAddedEvent,
}
dfdd.handleHostAddedEvent(common.InputServiceName, event)

outputService := NewMockInputOutputService("outputhost")
thriftService = admin.NewTChanOutputHostAdminServer(outputService)
Expand Down

0 comments on commit d21c480

Please sign in to comment.