Skip to content

Commit

Permalink
fix(servstate): don't hold both servicesLock and state lock at once (c…
Browse files Browse the repository at this point in the history
…anonical#359)

* fix(servstate): don't hold both servicesLock and state lock at once

This avoids the 3-lock deadlock described in
canonical#314. Other goroutines may be
holding the state lock and waiting for the services lock, so it's
problematic to acquire both locks at once. Break that part of the
cycle.

We could do this inside serviceForStart/serviceForStop by manually
calling Unlock() sooner, but that's error-prone, so continue using
defer, but have the caller write the task log (which needs the state
lock) after the services lock is released.

* Add regression test for the deadlock issue

This test consistently FAILs without the fix in this PR, but
consistently PASSes with the fix in this PR. The repro is basically
as per the instructions at
canonical#314 (comment)
  • Loading branch information
benhoyt committed Feb 20, 2024
1 parent f4e7338 commit 8a70b47
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 22 deletions.
108 changes: 108 additions & 0 deletions internals/daemon/api_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package daemon

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -359,3 +362,108 @@ services:
tasks := chg.Tasks()
c.Check(tasks, HasLen, 0)
}

// Regression test for 3-lock deadlock issue described in
// https://github.com/canonical/pebble/issues/314
func (s *apiSuite) TestDeadlock(c *C) {
// Set up
writeTestLayer(s.pebbleDir, `
services:
test:
override: replace
command: sleep 10
`)
daemon, err := New(&Options{
Dir: s.pebbleDir,
SocketPath: s.pebbleDir + ".pebble.socket",
})
c.Assert(err, IsNil)
err = daemon.Init()
c.Assert(err, IsNil)
daemon.Start()

// To try to reproduce the deadlock, call these endpoints in a loop:
// - GET /v1/services
// - POST /v1/services with action=start
// - POST /v1/services with action=stop

getServices := func(ctx context.Context) {
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/services", nil)
c.Assert(err, IsNil)
rsp := v1GetServices(apiCmd("/v1/services"), req, nil).(*resp)
rec := httptest.NewRecorder()
rsp.ServeHTTP(rec, req)
if rec.Code != 200 {
panic(fmt.Sprintf("expected 200, got %d", rec.Code))
}
}

serviceAction := func(ctx context.Context, action string) {
body := `{"action": "` + action + `", "services": ["test"]}`
req, err := http.NewRequestWithContext(ctx, "POST", "/v1/services", strings.NewReader(body))
c.Assert(err, IsNil)
rsp := v1PostServices(apiCmd("/v1/services"), req, nil).(*resp)
rec := httptest.NewRecorder()
rsp.ServeHTTP(rec, req)
if rec.Code != 202 {
panic(fmt.Sprintf("expected 202, got %d", rec.Code))
}
}

ctx, cancel := context.WithCancel(context.Background())

go func() {
for ctx.Err() == nil {
getServices(ctx)
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}()

go func() {
for ctx.Err() == nil {
serviceAction(ctx, "start")
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}()

go func() {
for ctx.Err() == nil {
serviceAction(ctx, "stop")
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}()

// Wait some time for deadlock to happen (when the bug was present, it
// normally happened in well under a second).
time.Sleep(time.Second)
cancel()

// Try to hit GET /v1/services once more; if it times out -- deadlock!
done := make(chan struct{})
go func() {
getServices(context.Background())
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(time.Second):
c.Fatal("timed out waiting for final request -- deadlock!")
}

// Otherwise wait for all changes to be done, then clean up (stop the daemon).
var readyChans []<-chan struct{}
daemon.state.Lock()
for _, change := range daemon.state.Changes() {
readyChans = append(readyChans, change.Ready())
}
daemon.state.Unlock()
for _, ch := range readyChans {
select {
case <-ch:
case <-time.After(5 * time.Second):
c.Fatal("timed out waiting for ready channel")
}
}
err = daemon.Stop(nil)
c.Assert(err, IsNil)
}
50 changes: 28 additions & 22 deletions internals/overlord/servstate/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error {
}

// Create the service object (or reuse the existing one by name).
service := m.serviceForStart(task, config)
service, taskLog := m.serviceForStart(config)
if taskLog != "" {
addTaskLog(task, taskLog)
}
if service == nil {
return nil
}
Expand Down Expand Up @@ -164,11 +167,13 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error {
// creates a new service object if one doesn't exist, returns the existing one
// if it already exists but is stopped, or returns nil if it already exists
// and is running.
func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service) *serviceData {
//
// It also returns a message to add to the task's log, or empty string if none.
func (m *ServiceManager) serviceForStart(config *plan.Service) (service *serviceData, taskLog string) {
m.servicesLock.Lock()
defer m.servicesLock.Unlock()

service := m.services[config.Name]
service = m.services[config.Name]
if service == nil {
// Not already started, create a new service object.
service = &serviceData{
Expand All @@ -180,34 +185,33 @@ func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service)
stopped: make(chan error, 2), // enough for killTimeElapsed to send, and exit if it happens after
}
m.services[config.Name] = service
return service
return service, ""
}

// Ensure config is up-to-date from the plan whenever the user starts a service.
service.config = config.Copy()

switch service.state {
case stateInitial, stateStarting, stateRunning:
taskLogf(task, "Service %q already started.", config.Name)
return nil
return nil, fmt.Sprintf("Service %q already started.", config.Name)
case stateBackoff, stateStopped, stateExited:
// Start allowed when service is backing off, was stopped, or has exited.
service.backoffNum = 0
service.backoffTime = 0
service.transition(stateInitial)
return service
return service, ""
default:
// Cannot start service while terminating or killing, handle in start().
return service
return service, ""
}
}

func taskLogf(task *state.Task, format string, args ...interface{}) {
func addTaskLog(task *state.Task, message string) {
st := task.State()
st.Lock()
defer st.Unlock()

task.Logf(format, args...)
task.Logf("%s", message)
}

func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error {
Expand All @@ -218,7 +222,10 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error {
return err
}

service := m.serviceForStop(task, request.Name)
service, taskLog := m.serviceForStop(request.Name)
if taskLog != "" {
addTaskLog(task, taskLog)
}
if service == nil {
return nil
}
Expand Down Expand Up @@ -250,29 +257,27 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error {
// serviceForStop looks up the service by name in the services map; it
// returns the service object if it exists and is running, or nil if it's
// already stopped or has never been started.
func (m *ServiceManager) serviceForStop(task *state.Task, name string) *serviceData {
//
// It also returns a message to add to the task's log, or empty string if none.
func (m *ServiceManager) serviceForStop(name string) (service *serviceData, taskLog string) {
m.servicesLock.Lock()
defer m.servicesLock.Unlock()

service := m.services[name]
service = m.services[name]
if service == nil {
taskLogf(task, "Service %q has never been started.", name)
return nil
return nil, fmt.Sprintf("Service %q has never been started.", name)
}

switch service.state {
case stateTerminating, stateKilling:
taskLogf(task, "Service %q already stopping.", name)
return nil
return nil, fmt.Sprintf("Service %q already stopping.", name)
case stateStopped:
taskLogf(task, "Service %q already stopped.", name)
return nil
return nil, fmt.Sprintf("Service %q already stopped.", name)
case stateExited:
taskLogf(task, "Service %q had already exited.", name)
service.transition(stateStopped)
return nil
return nil, fmt.Sprintf("Service %q had already exited.", name)
default:
return service
return service, ""
}
}

Expand Down Expand Up @@ -583,6 +588,7 @@ func addLastLogs(task *state.Task, logBuffer *servicelog.RingBuffer) {
task.Logf("Most recent service output:\n%s", logs)
}
}

func (s *serviceData) doBackoff(action plan.ServiceAction, onType string) {
s.backoffNum++
s.backoffTime = calculateNextBackoff(s.config, s.backoffTime)
Expand Down

0 comments on commit 8a70b47

Please sign in to comment.