Skip to content

Commit

Permalink
Version service specs
Browse files Browse the repository at this point in the history
Adds fields to Service and Task that keep track of a version number for
ServiceSpecs. This version number is different from the service object
version (which is tied to the Raft index).

Then change the scheduler to use this, instead of marshalling the specs
to compare them (which didn't work reliably).

Also make the orchestrator use SpecVersion as an optimization, when
available.

Signed-off-by: Aaron Lehmann <[email protected]>
  • Loading branch information
aaronlehmann committed Mar 13, 2017
1 parent 155db94 commit ac42b98
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 97 deletions.
249 changes: 174 additions & 75 deletions api/objects.pb.go

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions api/objects.proto
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,21 @@ message Service {

ServiceSpec spec = 3 [(gogoproto.nullable) = false];

// SpecVersion versions Spec, to identify changes in the spec. This is
// not a Version because it uses a different versioning scheme from
// top-level objects and is not directly comparable to top-level object
// versions.
uint64 spec_version = 10;

// PreviousSpec is the previous service spec that was in place before
// "Spec".
ServiceSpec previous_spec = 6;

// PreviousSpecVersion versions PreviousSpec. This is not a Version
// because it uses a different versioning scheme from top-level
// objects and is not directly comparable to top-level object versions.
uint64 previous_spec_version = 11;

// Runtime state of service endpoint. This may be different
// from the spec version because the user may not have entered
// the optional fields like node_port or virtual_ip and it
Expand Down Expand Up @@ -131,6 +142,10 @@ message Task {
// The system will honor this and will *never* modify it.
TaskSpec spec = 3 [(gogoproto.nullable) = false];

// SpecVersion is copied from Service, to identify which version of the
// spec this task has.
uint64 spec_version = 14;

// ServiceID indicates the service under which this task is orchestrated. This
// should almost always be set.
string service_id = 4;
Expand Down
10 changes: 8 additions & 2 deletions manager/controlapi/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,9 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
// TODO(aluzzardi): Consider using `Name` as a primary key to handle
// duplicate creations. See #65
service := &api.Service{
ID: identity.NewID(),
Spec: *request.Spec,
ID: identity.NewID(),
Spec: *request.Spec,
SpecVersion: 1,
}

if doesServiceNeedIngress(service) {
Expand Down Expand Up @@ -599,8 +600,11 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}

curSpec := service.Spec.Copy()
curSpecVersion := service.SpecVersion
service.Spec = *service.PreviousSpec.Copy()
service.SpecVersion = service.PreviousSpecVersion
service.PreviousSpec = curSpec
service.PreviousSpecVersion = curSpecVersion

service.UpdateStatus = &api.UpdateStatus{
State: api.UpdateStatus_ROLLBACK_STARTED,
Expand All @@ -609,7 +613,9 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe
}
} else {
service.PreviousSpec = service.Spec.Copy()
service.PreviousSpecVersion = service.SpecVersion
service.Spec = *request.Spec.Copy()
service.SpecVersion++

// Reset update status
service.UpdateStatus = nil
Expand Down
8 changes: 8 additions & 0 deletions manager/orchestrator/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func NewTask(cluster *api.Cluster, service *api.Service, slot uint64, nodeID str
ID: taskID,
ServiceAnnotations: service.Spec.Annotations,
Spec: service.Spec.Task,
SpecVersion: service.SpecVersion,
ServiceID: service.ID,
Slot: slot,
Status: api.TaskStatus{
Expand Down Expand Up @@ -62,6 +63,13 @@ func RestartCondition(task *api.Task) api.RestartPolicy_RestartCondition {

// IsTaskDirty determines whether a task matches the given service's spec.
func IsTaskDirty(s *api.Service, t *api.Task) bool {
// If the spec version matches, we know the task is not dirty. However,
// if it does not match, that doesn't mean the task is dirty, since
// only a portion of the spec is included in the comparison.
if t.SpecVersion != 0 && s.SpecVersion == t.SpecVersion {
return false
}

return !reflect.DeepEqual(s.Spec.Task, t.Spec) ||
(t.Endpoint != nil && !reflect.DeepEqual(s.Spec.Endpoint, t.Endpoint.Spec))
}
Expand Down
40 changes: 23 additions & 17 deletions manager/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,12 @@ func (s *Scheduler) processPreassignedTasks(ctx context.Context) {

// tick attempts to schedule the queue.
func (s *Scheduler) tick(ctx context.Context) {
tasksByCommonSpec := make(map[string]map[string]*api.Task)
type commonSpecKey struct {
serviceID string
specVersion uint64
}
tasksByCommonSpec := make(map[commonSpecKey]map[string]*api.Task)
var oneOffTasks []*api.Task
schedulingDecisions := make(map[string]schedulingDecision, len(s.unassignedTasks))

for taskID, t := range s.unassignedTasks {
Expand All @@ -341,30 +346,31 @@ func (s *Scheduler) tick(ctx context.Context) {
continue
}

// Group common tasks with common specs by marshalling the spec
// into taskKey and using it as a map key.
// TODO(aaronl): Once specs are versioned, this will allow a
// much more efficient fast path.
fieldsToMarshal := api.Task{
ServiceID: t.ServiceID,
Spec: t.Spec,
}
marshalled, err := fieldsToMarshal.Marshal()
if err != nil {
panic(err)
}
taskGroupKey := string(marshalled)
// Group tasks with common specs
if t.SpecVersion != 0 {
taskGroupKey := commonSpecKey{
serviceID: t.ServiceID,
specVersion: t.SpecVersion,
}

if tasksByCommonSpec[taskGroupKey] == nil {
tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
if tasksByCommonSpec[taskGroupKey] == nil {
tasksByCommonSpec[taskGroupKey] = make(map[string]*api.Task)
}
tasksByCommonSpec[taskGroupKey][taskID] = t
} else {
// This task doesn't have a spec version. We have to
// schedule it as a one-off.
oneOffTasks = append(oneOffTasks, t)
}
tasksByCommonSpec[taskGroupKey][taskID] = t
delete(s.unassignedTasks, taskID)
}

for _, taskGroup := range tasksByCommonSpec {
s.scheduleTaskGroup(ctx, taskGroup, schedulingDecisions)
}
for _, t := range oneOffTasks {
s.scheduleTaskGroup(ctx, map[string]*api.Task{t.ID: t}, schedulingDecisions)
}

_, failed := s.applySchedulingDecisions(ctx, schedulingDecisions)
for _, decision := range failed {
Expand Down
34 changes: 31 additions & 3 deletions manager/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func TestScheduler(t *testing.T) {
assert.NotEqual(t, "id6", assignment8.NodeID)
}

func TestHA(t *testing.T) {
func testHA(t *testing.T, useSpecVersion bool) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
Expand Down Expand Up @@ -432,6 +432,11 @@ func TestHA(t *testing.T) {
},
}

if useSpecVersion {
taskTemplate1.SpecVersion = 1
taskTemplate2.SpecVersion = 1
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
Expand Down Expand Up @@ -641,7 +646,12 @@ func TestHA(t *testing.T) {
assert.Equal(t, 1, t2Assignments["id1"])
}

func TestPreferences(t *testing.T) {
func TestHA(t *testing.T) {
t.Run("useSpecVersion=false", func(t *testing.T) { testHA(t, false) })
t.Run("useSpecVersion=true", func(t *testing.T) { testHA(t, true) })
}

func testPreferences(t *testing.T, useSpecVersion bool) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
Expand Down Expand Up @@ -737,6 +747,10 @@ func TestPreferences(t *testing.T) {
},
}

if useSpecVersion {
taskTemplate1.SpecVersion = 1
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
Expand Down Expand Up @@ -785,7 +799,12 @@ func TestPreferences(t *testing.T) {
assert.Equal(t, 1, t1Assignments["id5"])
}

func TestMultiplePreferences(t *testing.T) {
func TestPreferences(t *testing.T) {
t.Run("useSpecVersion=false", func(t *testing.T) { testPreferences(t, false) })
t.Run("useSpecVersion=true", func(t *testing.T) { testPreferences(t, true) })
}

func testMultiplePreferences(t *testing.T, useSpecVersion bool) {
ctx := context.Background()
initialNodeSet := []*api.Node{
{
Expand Down Expand Up @@ -968,6 +987,10 @@ func TestMultiplePreferences(t *testing.T) {
},
}

if useSpecVersion {
taskTemplate1.SpecVersion = 1
}

s := store.NewMemoryStore(nil)
assert.NotNil(t, s)
defer s.Close()
Expand Down Expand Up @@ -1057,6 +1080,11 @@ func TestMultiplePreferences(t *testing.T) {
}
}

func TestMultiplePreferences(t *testing.T) {
t.Run("useSpecVersion=false", func(t *testing.T) { testMultiplePreferences(t, false) })
t.Run("useSpecVersion=true", func(t *testing.T) { testMultiplePreferences(t, true) })
}

func TestSchedulerNoReadyNodes(t *testing.T) {
ctx := context.Background()
initialTask := &api.Task{
Expand Down

0 comments on commit ac42b98

Please sign in to comment.