diff --git a/internals/daemon/api_services_test.go b/internals/daemon/api_services_test.go index 09ba46fc..1991091f 100644 --- a/internals/daemon/api_services_test.go +++ b/internals/daemon/api_services_test.go @@ -16,8 +16,11 @@ package daemon import ( "bytes" + "context" "encoding/json" + "fmt" "io/ioutil" + "math/rand" "net/http" "net/http/httptest" "os" @@ -359,3 +362,109 @@ services: tasks := chg.Tasks() c.Check(tasks, HasLen, 0) } + +// Regression test for 3-lock deadlock issue described in +// https://github.com/canonical/pebble/issues/314 +func (s *apiSuite) TestDeadlock(c *C) { + // Set up + writeTestLayer(s.pebbleDir, ` +services: + test: + override: replace + command: sleep 10 +`) + daemon, err := New(&Options{ + Dir: s.pebbleDir, + SocketPath: s.pebbleDir + ".pebble.socket", + }) + c.Assert(err, IsNil) + err = daemon.Init() + c.Assert(err, IsNil) + err = daemon.Start() + c.Assert(err, IsNil) + + // To try to reproduce the deadlock, call these endpoints in a loop: + // - GET /v1/services + // - POST /v1/services with action=start + // - POST /v1/services with action=stop + + getServices := func(ctx context.Context) { + req, err := http.NewRequestWithContext(ctx, "GET", "/v1/services", nil) + c.Assert(err, IsNil) + rsp := v1GetServices(apiCmd("/v1/services"), req, nil).(*resp) + rec := httptest.NewRecorder() + rsp.ServeHTTP(rec, req) + if rec.Code != 200 { + panic(fmt.Sprintf("expected 200, got %d", rec.Code)) + } + } + + serviceAction := func(ctx context.Context, action string) { + body := `{"action": "` + action + `", "services": ["test"]}` + req, err := http.NewRequestWithContext(ctx, "POST", "/v1/services", strings.NewReader(body)) + c.Assert(err, IsNil) + rsp := v1PostServices(apiCmd("/v1/services"), req, nil).(*resp) + rec := httptest.NewRecorder() + rsp.ServeHTTP(rec, req) + if rec.Code != 202 { + panic(fmt.Sprintf("expected 202, got %d", rec.Code)) + } + } + + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + for ctx.Err() == nil { + getServices(ctx) + time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) + } + }() + + go func() { + for ctx.Err() == nil { + serviceAction(ctx, "start") + time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) + } + }() + + go func() { + for ctx.Err() == nil { + serviceAction(ctx, "stop") + time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond) + } + }() + + // Wait some time for deadlock to happen (when the bug was present, it + // normally happened in well under a second). + time.Sleep(time.Second) + cancel() + + // Try to hit GET /v1/services once more; if it times out -- deadlock! + done := make(chan struct{}) + go func() { + getServices(context.Background()) + done <- struct{}{} + }() + select { + case <-done: + case <-time.After(time.Second): + c.Fatal("timed out waiting for final request -- deadlock!") + } + + // Otherwise wait for all changes to be done, then clean up (stop the daemon). + var readyChans []<-chan struct{} + daemon.state.Lock() + for _, change := range daemon.state.Changes() { + readyChans = append(readyChans, change.Ready()) + } + daemon.state.Unlock() + for _, ch := range readyChans { + select { + case <-ch: + case <-time.After(5 * time.Second): + c.Fatal("timed out waiting for ready channel") + } + } + err = daemon.Stop(nil) + c.Assert(err, IsNil) +} diff --git a/internals/overlord/servstate/handlers.go b/internals/overlord/servstate/handlers.go index f753e8ea..a7902d36 100644 --- a/internals/overlord/servstate/handlers.go +++ b/internals/overlord/servstate/handlers.go @@ -125,7 +125,10 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error { } // Create the service object (or reuse the existing one by name). - service := m.serviceForStart(task, config) + service, taskLog := m.serviceForStart(config) + if taskLog != "" { + addTaskLog(task, taskLog) + } if service == nil { return nil } @@ -166,11 +169,13 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error { // creates a new service object if one doesn't exist, returns the existing one // if it already exists but is stopped, or returns nil if it already exists // and is running. -func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service) *serviceData { +// +// It also returns a message to add to the task's log, or empty string if none. +func (m *ServiceManager) serviceForStart(config *plan.Service) (service *serviceData, taskLog string) { m.servicesLock.Lock() defer m.servicesLock.Unlock() - service := m.services[config.Name] + service = m.services[config.Name] if service == nil { // Not already started, create a new service object. service = &serviceData{ @@ -182,7 +187,7 @@ func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service) stopped: make(chan error, 2), // enough for killTimeElapsed to send, and exit if it happens after } m.services[config.Name] = service - return service + return service, "" } // Ensure config is up-to-date from the plan whenever the user starts a service. @@ -190,26 +195,25 @@ func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service) switch service.state { case stateInitial, stateStarting, stateRunning: - taskLogf(task, "Service %q already started.", config.Name) - return nil + return nil, fmt.Sprintf("Service %q already started.", config.Name) case stateBackoff, stateStopped, stateExited: // Start allowed when service is backing off, was stopped, or has exited. service.backoffNum = 0 service.backoffTime = 0 service.transition(stateInitial) - return service + return service, "" default: // Cannot start service while terminating or killing, handle in start(). - return service + return service, "" } } -func taskLogf(task *state.Task, format string, args ...interface{}) { +func addTaskLog(task *state.Task, message string) { st := task.State() st.Lock() defer st.Unlock() - task.Logf(format, args...) + task.Logf("%s", message) } func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error { @@ -220,7 +224,10 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error { return err } - service := m.serviceForStop(task, request.Name) + service, taskLog := m.serviceForStop(request.Name) + if taskLog != "" { + addTaskLog(task, taskLog) + } if service == nil { return nil } @@ -252,29 +259,27 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error { // serviceForStop looks up the service by name in the services map; it // returns the service object if it exists and is running, or nil if it's // already stopped or has never been started. -func (m *ServiceManager) serviceForStop(task *state.Task, name string) *serviceData { +// +// It also returns a message to add to the task's log, or empty string if none. +func (m *ServiceManager) serviceForStop(name string) (service *serviceData, taskLog string) { m.servicesLock.Lock() defer m.servicesLock.Unlock() - service := m.services[name] + service = m.services[name] if service == nil { - taskLogf(task, "Service %q has never been started.", name) - return nil + return nil, fmt.Sprintf("Service %q has never been started.", name) } switch service.state { case stateTerminating, stateKilling: - taskLogf(task, "Service %q already stopping.", name) - return nil + return nil, fmt.Sprintf("Service %q already stopping.", name) case stateStopped: - taskLogf(task, "Service %q already stopped.", name) - return nil + return nil, fmt.Sprintf("Service %q already stopped.", name) case stateExited: - taskLogf(task, "Service %q had already exited.", name) service.transition(stateStopped) - return nil + return nil, fmt.Sprintf("Service %q had already exited.", name) default: - return service + return service, "" } } @@ -585,6 +590,7 @@ func addLastLogs(task *state.Task, logBuffer *servicelog.RingBuffer) { task.Logf("Most recent service output:\n%s", logs) } } + func (s *serviceData) doBackoff(action plan.ServiceAction, onType string) { s.backoffNum++ s.backoffTime = calculateNextBackoff(s.config, s.backoffTime)