Skip to content

Commit

Permalink
scaling: fix state store corruption bug for job scaling events
Browse files Browse the repository at this point in the history
When updating a `JobScalingEvent`, the state store function did not copy the
existing object before mutating it. This corrupts the state store because it
modifies the leaf node without committing it in a transaction. It can also cause
the Nomad server to crash with a "fatal error: concurrent map read and map
write" if its `ScalingEvents` map is read via the `ScaleStatus` RPC at the same
time as it's being written.

This changeset also removes some mostly-unused public methods on the struct that
dangerously encourage you to mutate it outside of a copy.

Ref: https://hashicorp.atlassian.net/browse/NET-10529
  • Loading branch information
tgross committed Jul 23, 2024
1 parent 7a2c70e commit b112548
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 19 deletions.
3 changes: 3 additions & 0 deletions .changelog/23673.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scaling: Fixed a bug where state store corruption could occur when writing scaling events
```
2 changes: 1 addition & 1 deletion nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ func (s *StateStore) UpsertScalingEvent(index uint64, req *structs.ScalingEventR

var jobEvents *structs.JobScalingEvents
if existing != nil {
jobEvents = existing.(*structs.JobScalingEvents)
jobEvents = existing.(*structs.JobScalingEvents).Copy()
} else {
jobEvents = &structs.JobScalingEvents{
Namespace: req.Namespace,
Expand Down
15 changes: 9 additions & 6 deletions nomad/state/state_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10636,9 +10636,10 @@ func TestStateStore_UpsertScalingEvent(t *testing.T) {
job := mock.Job()
groupName := job.TaskGroups[0].Name

newEvent := structs.NewScalingEvent("message 1").SetMeta(map[string]interface{}{
newEvent := structs.NewScalingEvent("message 1")
newEvent.Meta = map[string]interface{}{
"a": 1,
})
}

wsAll := memdb.NewWatchSet()
all, err := state.ScalingEvents(wsAll)
Expand Down Expand Up @@ -10709,10 +10710,11 @@ func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) {

index := uint64(1000)
for i := 1; i <= structs.JobTrackedScalingEvents+10; i++ {
newEvent := structs.NewScalingEvent("").SetMeta(map[string]interface{}{
newEvent := structs.NewScalingEvent("")
newEvent.Meta = map[string]interface{}{
"i": i,
"group": group1,
})
}
err := state.UpsertScalingEvent(index, &structs.ScalingEventRequest{
Namespace: namespace,
JobID: jobID,
Expand All @@ -10722,10 +10724,11 @@ func TestStateStore_UpsertScalingEvent_LimitAndOrder(t *testing.T) {
index++
require.NoError(err)

newEvent = structs.NewScalingEvent("").SetMeta(map[string]interface{}{
newEvent = structs.NewScalingEvent("")
newEvent.Meta = map[string]interface{}{
"i": i,
"group": group2,
})
}
err = state.UpsertScalingEvent(index, &structs.ScalingEventRequest{
Namespace: namespace,
JobID: jobID,
Expand Down
37 changes: 25 additions & 12 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6096,6 +6096,21 @@ type JobScalingEvents struct {
ModifyIndex uint64
}

func (j *JobScalingEvents) Copy() *JobScalingEvents {
if j == nil {
return nil
}
njse := new(JobScalingEvents)
*njse = *j

njse.ScalingEvents = map[string][]*ScalingEvent{}
for taskGroup, events := range j.ScalingEvents {
njse.ScalingEvents[taskGroup] = helper.CopySlice(events)
}

return njse
}

// NewScalingEvent method for ScalingEvent objects.
func NewScalingEvent(message string) *ScalingEvent {
return &ScalingEvent{
Expand Down Expand Up @@ -6131,19 +6146,17 @@ type ScalingEvent struct {
CreateIndex uint64
}

func (e *ScalingEvent) SetError(error bool) *ScalingEvent {
e.Error = error
return e
}

func (e *ScalingEvent) SetMeta(meta map[string]interface{}) *ScalingEvent {
e.Meta = meta
return e
}
func (e *ScalingEvent) Copy() *ScalingEvent {
if e == nil {
return nil
}
ne := new(ScalingEvent)
*ne = *e

func (e *ScalingEvent) SetEvalID(evalID string) *ScalingEvent {
e.EvalID = &evalID
return e
ne.Count = pointer.Copy(e.Count)
ne.Meta = maps.Clone(e.Meta)
ne.EvalID = pointer.Copy(e.EvalID)
return ne
}

// ScalingEventRequest is by for Job.Scale endpoint
Expand Down

0 comments on commit b112548

Please sign in to comment.