Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(servstate): don't hold both servicesLock and state lock at once #359

Merged
merged 2 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions internals/daemon/api_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package daemon

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -359,3 +362,109 @@ services:
tasks := chg.Tasks()
c.Check(tasks, HasLen, 0)
}

// Regression test for 3-lock deadlock issue described in
// https://github.com/canonical/pebble/issues/314
func (s *apiSuite) TestDeadlock(c *C) {
// Set up
writeTestLayer(s.pebbleDir, `
services:
test:
override: replace
command: sleep 10
`)
daemon, err := New(&Options{
Dir: s.pebbleDir,
SocketPath: s.pebbleDir + ".pebble.socket",
})
c.Assert(err, IsNil)
err = daemon.Init()
c.Assert(err, IsNil)
err = daemon.Start()
c.Assert(err, IsNil)

// To try to reproduce the deadlock, call these endpoints in a loop:
// - GET /v1/services
// - POST /v1/services with action=start
// - POST /v1/services with action=stop

getServices := func(ctx context.Context) {
req, err := http.NewRequestWithContext(ctx, "GET", "/v1/services", nil)
c.Assert(err, IsNil)
rsp := v1GetServices(apiCmd("/v1/services"), req, nil).(*resp)
rec := httptest.NewRecorder()
rsp.ServeHTTP(rec, req)
if rec.Code != 200 {
panic(fmt.Sprintf("expected 200, got %d", rec.Code))
}
}

serviceAction := func(ctx context.Context, action string) {
body := `{"action": "` + action + `", "services": ["test"]}`
req, err := http.NewRequestWithContext(ctx, "POST", "/v1/services", strings.NewReader(body))
c.Assert(err, IsNil)
rsp := v1PostServices(apiCmd("/v1/services"), req, nil).(*resp)
rec := httptest.NewRecorder()
rsp.ServeHTTP(rec, req)
if rec.Code != 202 {
panic(fmt.Sprintf("expected 202, got %d", rec.Code))
}
}

ctx, cancel := context.WithCancel(context.Background())

go func() {
for ctx.Err() == nil {
getServices(ctx)
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}()

go func() {
for ctx.Err() == nil {
serviceAction(ctx, "start")
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}()

go func() {
for ctx.Err() == nil {
serviceAction(ctx, "stop")
time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
}
}()

// Wait some time for deadlock to happen (when the bug was present, it
// normally happened in well under a second).
time.Sleep(time.Second)
cancel()

// Try to hit GET /v1/services once more; if it times out -- deadlock!
done := make(chan struct{})
go func() {
getServices(context.Background())
done <- struct{}{}
}()
select {
case <-done:
case <-time.After(time.Second):
c.Fatal("timed out waiting for final request -- deadlock!")
}

// Otherwise wait for all changes to be done, then clean up (stop the daemon).
var readyChans []<-chan struct{}
daemon.state.Lock()
for _, change := range daemon.state.Changes() {
readyChans = append(readyChans, change.Ready())
}
daemon.state.Unlock()
for _, ch := range readyChans {
select {
case <-ch:
case <-time.After(5 * time.Second):
c.Fatal("timed out waiting for ready channel")
}
}
err = daemon.Stop(nil)
c.Assert(err, IsNil)
}
50 changes: 28 additions & 22 deletions internals/overlord/servstate/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error {
}

// Create the service object (or reuse the existing one by name).
service := m.serviceForStart(task, config)
service, taskLog := m.serviceForStart(config)
if taskLog != "" {
addTaskLog(task, taskLog)
}
if service == nil {
return nil
}
Expand Down Expand Up @@ -166,11 +169,13 @@ func (m *ServiceManager) doStart(task *state.Task, tomb *tomb.Tomb) error {
// creates a new service object if one doesn't exist, returns the existing one
// if it already exists but is stopped, or returns nil if it already exists
// and is running.
func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service) *serviceData {
//
// It also returns a message to add to the task's log, or empty string if none.
func (m *ServiceManager) serviceForStart(config *plan.Service) (service *serviceData, taskLog string) {
m.servicesLock.Lock()
defer m.servicesLock.Unlock()

service := m.services[config.Name]
service = m.services[config.Name]
if service == nil {
// Not already started, create a new service object.
service = &serviceData{
Expand All @@ -182,34 +187,33 @@ func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service)
stopped: make(chan error, 2), // enough for killTimeElapsed to send, and exit if it happens after
}
m.services[config.Name] = service
return service
return service, ""
}

// Ensure config is up-to-date from the plan whenever the user starts a service.
service.config = config.Copy()

switch service.state {
case stateInitial, stateStarting, stateRunning:
taskLogf(task, "Service %q already started.", config.Name)
return nil
return nil, fmt.Sprintf("Service %q already started.", config.Name)
case stateBackoff, stateStopped, stateExited:
// Start allowed when service is backing off, was stopped, or has exited.
service.backoffNum = 0
service.backoffTime = 0
service.transition(stateInitial)
return service
return service, ""
default:
// Cannot start service while terminating or killing, handle in start().
return service
return service, ""
}
}

func taskLogf(task *state.Task, format string, args ...interface{}) {
func addTaskLog(task *state.Task, message string) {
st := task.State()
st.Lock()
defer st.Unlock()

task.Logf(format, args...)
task.Logf("%s", message)
}

func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error {
Expand All @@ -220,7 +224,10 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error {
return err
}

service := m.serviceForStop(task, request.Name)
service, taskLog := m.serviceForStop(request.Name)
if taskLog != "" {
addTaskLog(task, taskLog)
}
if service == nil {
return nil
}
Expand Down Expand Up @@ -252,29 +259,27 @@ func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error {
// serviceForStop looks up the service by name in the services map; it
// returns the service object if it exists and is running, or nil if it's
// already stopped or has never been started.
func (m *ServiceManager) serviceForStop(task *state.Task, name string) *serviceData {
//
// It also returns a message to add to the task's log, or empty string if none.
func (m *ServiceManager) serviceForStop(name string) (service *serviceData, taskLog string) {
m.servicesLock.Lock()
defer m.servicesLock.Unlock()

service := m.services[name]
service = m.services[name]
if service == nil {
taskLogf(task, "Service %q has never been started.", name)
return nil
return nil, fmt.Sprintf("Service %q has never been started.", name)
}

switch service.state {
case stateTerminating, stateKilling:
taskLogf(task, "Service %q already stopping.", name)
return nil
return nil, fmt.Sprintf("Service %q already stopping.", name)
case stateStopped:
taskLogf(task, "Service %q already stopped.", name)
return nil
return nil, fmt.Sprintf("Service %q already stopped.", name)
case stateExited:
taskLogf(task, "Service %q had already exited.", name)
service.transition(stateStopped)
return nil
return nil, fmt.Sprintf("Service %q had already exited.", name)
default:
return service
return service, ""
}
}

Expand Down Expand Up @@ -585,6 +590,7 @@ func addLastLogs(task *state.Task, logBuffer *servicelog.RingBuffer) {
task.Logf("Most recent service output:\n%s", logs)
}
}

func (s *serviceData) doBackoff(action plan.ServiceAction, onType string) {
s.backoffNum++
s.backoffTime = calculateNextBackoff(s.config, s.backoffTime)
Expand Down
Loading