Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(state): don't lock global state lock when adding a task log #356

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions internals/overlord/servstate/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service)

switch service.state {
case stateInitial, stateStarting, stateRunning:
taskLogf(task, "Service %q already started.", config.Name)
task.Logf("Service %q already started.", config.Name)
return nil
case stateBackoff, stateStopped, stateExited:
// Start allowed when service is backing off, was stopped, or has exited.
Expand All @@ -204,14 +204,6 @@ func (m *ServiceManager) serviceForStart(task *state.Task, config *plan.Service)
}
}

func taskLogf(task *state.Task, format string, args ...interface{}) {
st := task.State()
st.Lock()
defer st.Unlock()

task.Logf(format, args...)
}

func (m *ServiceManager) doStop(task *state.Task, tomb *tomb.Tomb) error {
m.state.Lock()
request, err := TaskServiceRequest(task)
Expand Down Expand Up @@ -258,19 +250,19 @@ func (m *ServiceManager) serviceForStop(task *state.Task, name string) *serviceD

service := m.services[name]
if service == nil {
taskLogf(task, "Service %q has never been started.", name)
task.Logf("Service %q has never been started.", name)
return nil
}

switch service.state {
case stateTerminating, stateKilling:
taskLogf(task, "Service %q already stopping.", name)
task.Logf("Service %q already stopping.", name)
return nil
case stateStopped:
taskLogf(task, "Service %q already stopped.", name)
task.Logf("Service %q already stopped.", name)
return nil
case stateExited:
taskLogf(task, "Service %q had already exited.", name)
task.Logf("Service %q had already exited.", name)
service.transition(stateStopped)
return nil
default:
Expand Down
20 changes: 13 additions & 7 deletions internals/overlord/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type State struct {

noticeCond *sync.Cond

modified bool
modified atomic.Bool

cache map[interface{}]interface{}

Expand All @@ -116,19 +116,19 @@ func New(backend Backend) *State {
tasks: make(map[string]*Task),
warnings: make(map[string]*Warning),
notices: make(map[noticeKey]*Notice),
modified: true,
cache: make(map[interface{}]interface{}),
pendingChangeByAttr: make(map[string]func(*Change) bool),
taskHandlers: make(map[int]func(t *Task, old Status, new Status)),
changeHandlers: make(map[int]func(chg *Change, old Status, new Status)),
}
st.modified.Store(true)
st.noticeCond = sync.NewCond(st) // use State.Lock and State.Unlock
return st
}

// Modified returns whether the state was modified since the last checkpoint.
func (s *State) Modified() bool {
return s.modified
return s.modified.Load()
}

// Lock acquires the state lock.
Expand All @@ -144,7 +144,7 @@ func (s *State) reading() {
}

func (s *State) writing() {
s.modified = true
s.modified.Store(true)
if atomic.LoadInt32(&s.muC) != 1 {
panic("internal error: accessing state without lock")
}
Expand Down Expand Up @@ -243,7 +243,14 @@ func (s *State) Unlocker() (unlock func() (relock func())) {
func (s *State) Unlock() {
defer s.unlock()

if !s.modified || s.backend == nil {
if s.backend == nil {
return
}

// If state hasn't been modified, do nothing, Otherwise, save state to
// disk and clear the flag.
modified := s.modified.Swap(false)
if !modified {
return
}

Expand All @@ -252,7 +259,6 @@ func (s *State) Unlock() {
start := time.Now()
for time.Since(start) <= unlockCheckpointRetryMaxTime {
if err = s.backend.Checkpoint(data); err == nil {
s.modified = false
return
}
time.Sleep(unlockCheckpointRetryInterval)
Expand Down Expand Up @@ -587,7 +593,7 @@ func ReadState(backend Backend, r io.Reader) (*State, error) {
}
s.backend = backend
s.noticeCond = sync.NewCond(s)
s.modified = false
s.modified.Store(false)
s.cache = make(map[interface{}]interface{})
s.pendingChangeByAttr = make(map[string]func(*Change) bool)
s.changeHandlers = make(map[int]func(chg *Change, old Status, new Status))
Expand Down
60 changes: 42 additions & 18 deletions internals/overlord/state/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package state
import (
"encoding/json"
"fmt"
"sync"
"time"

"github.com/canonical/pebble/internals/logger"
Expand Down Expand Up @@ -47,9 +48,14 @@ type Task struct {
waitTasks []string
haltTasks []string
lanes []int
log []string
change string

// Use a fine-grained lock for the logs to avoid holding the global state
// lock when just adding a log -- that easily caused deadlocks. See:
// https://github.com/canonical/pebble/issues/314
logLock sync.Mutex
log []string

spawnTime time.Time
readyTime time.Time

Expand Down Expand Up @@ -121,7 +127,7 @@ func (t *Task) MarshalJSON() ([]byte, error) {
WaitTasks: t.waitTasks,
HaltTasks: t.haltTasks,
Lanes: t.lanes,
Log: t.log,
Log: t.Log(),
Change: t.change,

SpawnTime: t.spawnTime,
Expand Down Expand Up @@ -165,7 +171,11 @@ func (t *Task) UnmarshalJSON(data []byte) error {
t.waitTasks = unmarshalled.WaitTasks
t.haltTasks = unmarshalled.HaltTasks
t.lanes = unmarshalled.Lanes
t.log = unmarshalled.Log
func() {
t.logLock.Lock()
defer t.logLock.Unlock()
t.log = unmarshalled.Log
}()
t.change = unmarshalled.Change
t.spawnTime = unmarshalled.SpawnTime
if unmarshalled.ReadyTime != nil {
Expand Down Expand Up @@ -416,42 +426,56 @@ func FakeTime(now time.Time) (restore func()) {
}

func (t *Task) addLog(kind, format string, args []interface{}) {
if len(t.log) > 9 {
copy(t.log, t.log[len(t.log)-9:])
t.log = t.log[:9]
}
var msg string
func() {
t.logLock.Lock()
defer t.logLock.Unlock()

if len(t.log) > 9 {
copy(t.log, t.log[len(t.log)-9:])
t.log = t.log[:9]
}

tstr := timeNow().Format(time.RFC3339)
msg = tstr + " " + kind + " " + fmt.Sprintf(format, args...)
t.log = append(t.log, msg)
}()

t.state.modified.Store(true)

tstr := timeNow().Format(time.RFC3339)
msg := tstr + " " + kind + " " + fmt.Sprintf(format, args...)
t.log = append(t.log, msg)
logger.Debugf(msg)
}

// Log returns the most recent messages logged into the task.
// Log returns a copy of the most recent messages logged into the task.
//
// Only the most recent entries logged are returned, potentially with
// different behavior for different task statuses. How many entries
// are returned is an implementation detail and may change over time.
//
// Messages are prefixed with one of the known message kinds.
// See details about LogInfo and LogError.
//
// The returned slice should not be read from without the
// state lock held, and should not be written to.
func (t *Task) Log() []string {
t.state.reading()
return t.log
t.logLock.Lock()
defer t.logLock.Unlock()

logCopy := make([]string, len(t.log))
for i, s := range t.log {
logCopy[i] = s
}
return logCopy
}

// Logf logs information about the progress of the task.
//
// The state lock does not need to be held when calling this method.
func (t *Task) Logf(format string, args ...interface{}) {
t.state.writing()
t.addLog(LogInfo, format, args)
}

// Errorf logs error information about the progress of the task.
//
// The state lock does not need to be held when calling this method.
func (t *Task) Errorf(format string, args ...interface{}) {
t.state.writing()
t.addLog(LogError, format, args)
}

Expand Down
3 changes: 0 additions & 3 deletions internals/overlord/state/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,6 @@ func (cs *taskSuite) TestMethodEntrance(c *C) {
func() { t1.Set("a", 1) },
func() { t2.WaitFor(t1) },
func() { t1.SetProgress("", 2, 2) },
func() { t1.Logf("") },
func() { t1.Errorf("") },
func() { t1.UnmarshalJSON(nil) },
func() { t1.SetProgress("", 1, 1) },
func() { t1.JoinLane(1) },
Expand All @@ -470,7 +468,6 @@ func (cs *taskSuite) TestMethodEntrance(c *C) {
func() { t1.WaitTasks() },
func() { t1.HaltTasks() },
func() { t1.Progress() },
func() { t1.Log() },
func() { t1.MarshalJSON() },
func() { t1.Progress() },
func() { t1.SetProgress("", 0, 1) },
Expand Down
Loading