Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: create placements for non-register MRD #15325

Merged
merged 2 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/15325.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
scheduler (Enterprise): Fixed a bug that prevented new allocations from multiregion jobs to be placed in situations where other regions are not involved, such as node updates.
```
31 changes: 31 additions & 0 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,10 +609,41 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index
}
}

if req.Deployment != nil {
// Cancel any preivous deployment.
lastDeployment, err := n.state.LatestDeploymentByJobID(ws, req.Job.Namespace, req.Job.ID)
if err != nil {
return fmt.Errorf("failed to retrieve latest deployment: %v", err)
}
if lastDeployment != nil && lastDeployment.Active() {
activeDeployment := lastDeployment.Copy()
activeDeployment.Status = structs.DeploymentStatusCancelled
activeDeployment.StatusDescription = structs.DeploymentStatusDescriptionNewerJob
if err := n.state.UpsertDeployment(index, activeDeployment); err != nil {
return err
}
}

// Update the deployment with the latest job indexes.
req.Deployment.JobCreateIndex = req.Job.CreateIndex
req.Deployment.JobModifyIndex = req.Job.ModifyIndex
req.Deployment.JobSpecModifyIndex = req.Job.JobModifyIndex
req.Deployment.JobVersion = req.Job.Version

if err := n.state.UpsertDeployment(index, req.Deployment); err != nil {
return err
}
}

// COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log,
// so this may be nil during server upgrades.
if req.Eval != nil {
req.Eval.JobModifyIndex = index

if req.Deployment != nil {
req.Eval.DeploymentID = req.Deployment.ID
}

if err := n.upsertEvals(msgType, index, []*structs.Evaluation{req.Eval}); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,9 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
submittedEval = true
}

// Pre-register a deployment if necessary.
args.Deployment = j.multiregionCreateDeployment(job, eval)

// Commit this update via Raft
fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err, ok := fsmErr.(error); ok && err != nil {
Expand Down
6 changes: 6 additions & 0 deletions nomad/job_endpoint_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ func (j *Job) enforceSubmitJob(override bool, job *structs.Job, nomadACLToken *s
return nil, nil
}

// multiregionCreateDeployment is used to create a deployment to register along
// with the job, if required.
func (j *Job) multiregionCreateDeployment(job *structs.Job, eval *structs.Evaluation) *structs.Deployment {
return nil
}

// multiregionRegister is used to send a job across multiple regions
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse, newVersion uint64) (bool, error) {
return false, nil
Expand Down
3 changes: 1 addition & 2 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,7 @@ func (s *StateStore) DeleteJobSummary(index uint64, namespace, id string) error
return txn.Commit()
}

// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to
// true, all prior deployments for the same job will be cancelled.
// UpsertDeployment is used to insert or update a new deployment.
func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment) error {
txn := s.db.WriteTxn(index)
defer txn.Abort()
Expand Down
24 changes: 15 additions & 9 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,10 @@ type JobRegisterRequest struct {
// Eval is the evaluation that is associated with the job registration
Eval *Evaluation

// Deployment is the deployment to be create when the job is registered. If
// there is an active deployment for the job it will be canceled.
Deployment *Deployment

WriteRequest
}

Expand Down Expand Up @@ -9216,14 +9220,15 @@ func (v *Vault) Validate() error {

const (
// DeploymentStatuses are the various states a deployment can be be in
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"
DeploymentStatusRunning = "running"
DeploymentStatusPaused = "paused"
DeploymentStatusFailed = "failed"
DeploymentStatusSuccessful = "successful"
DeploymentStatusCancelled = "cancelled"
DeploymentStatusInitializing = "initializing"
DeploymentStatusPending = "pending"
DeploymentStatusBlocked = "blocked"
DeploymentStatusUnblocking = "unblocking"

// TODO Statuses and Descriptions do not match 1:1 and we sometimes use the Description as a status flag

Expand Down Expand Up @@ -9357,7 +9362,8 @@ func (d *Deployment) Copy() *Deployment {
// Active returns whether the deployment is active or terminal.
func (d *Deployment) Active() bool {
switch d.Status {
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked, DeploymentStatusUnblocking, DeploymentStatusPending:
case DeploymentStatusRunning, DeploymentStatusPaused, DeploymentStatusBlocked,
DeploymentStatusUnblocking, DeploymentStatusInitializing, DeploymentStatusPending:
return true
default:
return false
Expand Down
68 changes: 36 additions & 32 deletions scheduler/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,24 +231,35 @@ func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
}

func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// Mark the deployment as complete if possible
if a.deployment != nil && deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
if a.deployment != nil {
// Mark the deployment as complete if possible
if deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if a.deployment.Status != structs.DeploymentStatusUnblocking &&
a.deployment.Status != structs.DeploymentStatusSuccessful {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
} else {
}

// Mark the deployment as pending since its state is now computed.
if a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
Status: structs.DeploymentStatusPending,
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
})
}
}
Expand All @@ -269,26 +280,18 @@ func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// allocReconciler that indicate the state of the deployment if one
// is required. The flags that are managed are:
// 1. deploymentFailed: Did the current deployment fail just as named.
// 2. deploymentPaused: Multiregion job types that use deployments run
// the deployments later during the fan-out stage. When the deployment
// is created it will be in a pending state. If an invariant violation
// is detected by the deploymentWatcher during it will enter a paused
// state. This flag tells Compute we're paused or pending, so we should
// not make placements on the deployment.
// 2. deploymentPaused: Set to true when the current deployment is paused,
// which is usually a manual user operation, or if the deployment is
// pending or initializing, which are the initial states for multi-region
// job deployments. This flag tells Compute that we should not make
// placements on the deployment.
func (a *allocReconciler) computeDeploymentPaused() {
if a.deployment != nil {
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
a.deployment.Status == structs.DeploymentStatusPending
a.deployment.Status == structs.DeploymentStatusPending ||
a.deployment.Status == structs.DeploymentStatusInitializing
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
if a.job.IsMultiregion() &&
a.job.UsesDeployments() &&
!(a.job.IsPeriodic() || a.job.IsParameterized()) {

a.deploymentPaused = true
}
}
}

// cancelUnneededDeployments cancels any deployment that is not needed. If the
Expand Down Expand Up @@ -512,6 +515,12 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
a.computeMigrations(desiredChanges, migrate, tg, isCanarying)
a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive)

// Deployments that are still initializing need to be sent in full in the
// plan so its internal state can be persisted by the plan applier.
if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusInitializing {
a.result.deployment = a.deployment
}

deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace,
migrate, rescheduleNow, place, rescheduleLater, requiresCanaries)

Expand Down Expand Up @@ -889,11 +898,6 @@ func (a *allocReconciler) createDeployment(groupName string, strategy *structs.U
// A previous group may have made the deployment already. If not create one.
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job, a.evalPriority)
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
a.result.deployment = a.deployment
}

Expand Down
42 changes: 26 additions & 16 deletions scheduler/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6074,34 +6074,34 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
expected: true,
},
{
name: "multiregion periodic service is not paused",
jobType: structs.JobTypeService,
isMultiregion: true,
isPeriodic: true,
name: "single region batch job is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isParameterized: false,
expected: false,
},
{
name: "multiregion parameterized service is not paused",
jobType: structs.JobTypeService,
isMultiregion: true,
name: "multiregion batch job is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isParameterized: true,
isParameterized: false,
expected: false,
},
{
name: "single region batch job is not paused",
name: "multiregion parameterized batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isMultiregion: true,
isPeriodic: false,
isParameterized: false,
isParameterized: true,
expected: false,
},
{
name: "multiregion batch job is not paused",
name: "multiregion periodic batch is not paused",
jobType: structs.JobTypeBatch,
isMultiregion: false,
isPeriodic: false,
isMultiregion: true,
isPeriodic: true,
isParameterized: false,
expected: false,
},
Expand All @@ -6119,8 +6119,18 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {

require.NotNil(t, job, "invalid job type", tc.jobType)

var deployment *structs.Deployment
if tc.isMultiregion {
job.Multiregion = multiregionCfg

// This deployment is created by the Job.Register RPC and
// fetched by the scheduler before handing it to the
// reconciler.
if job.UsesDeployments() {
deployment = structs.NewDeployment(job, 100)
deployment.Status = structs.DeploymentStatusInitializing
deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
}

if tc.isPeriodic {
Expand All @@ -6132,8 +6142,8 @@ func TestReconciler_ComputeDeploymentPaused(t *testing.T) {
}

reconciler := NewAllocReconciler(
testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, nil, nil, "", job.Priority, true)
testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, deployment,
nil, nil, "", job.Priority, true)

_ = reconciler.Compute()

Expand Down