From 75488007bcbf18969f023be4cb15aac7baf3745e Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 9 Nov 2018 18:30:53 -0500 Subject: [PATCH 1/2] Run job deregistering in a single transaction Fixes https://github.com/hashicorp/nomad/issues/4299 Upon investigating this case further, we determined the issue to be a race between applying `JobBatchDeregisterRequest` fsm operation and processing job-deregister evals. Processing job-deregister evals should wait until the FSM log message finishes applying, by using the snapshot index. However, with `JobBatchDeregister`, any single individual job deregistering was applied accidentally incremented the snapshot index and resulted into processing job-deregister evals. When a Nomad server receives an eval for a job in the batch that is yet to be deleted, we accidentally re-run it depending on the state of allocation. This change ensures that we delete deregister all of the jobs and inserts all evals in a single transactions, thus blocking processing related evals until deregistering complete. --- nomad/fsm.go | 44 +++++++++++++++++++++++------------ nomad/state/state_store.go | 47 +++++++++++++++++++++++++++++++++++--- 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 71b4c07bdab..722801134fc 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -503,12 +503,14 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil { - n.logger.Error("deregistering job failed", "error", err) - return err - } + return n.state.WithWriteTransaction(func(tx state.Txn) error { + if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil { + n.logger.Error("deregistering job failed", "error", err) + return err + } - return nil + return nil + }) } func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} { @@ -518,18 +520,32 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} panic(fmt.Errorf("failed to decode request: %v", err)) } - for jobNS, options := range req.Jobs { - if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge); err != nil { - n.logger.Error("deregistering job failed", "job", jobNS, "error", err) + err := n.state.WithWriteTransaction(func(tx state.Txn) error { + for jobNS, options := range req.Jobs { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { + n.logger.Error("deregistering job failed", "job", jobNS, "error", err) + return err + } + } + + if err := n.state.UpsertEvalsTxn(index, req.Evals, tx); err != nil { + n.logger.Error("UpsertEvals failed", "error", err) return err } + + return nil + }) + + if err != nil { + return err } - return n.upsertEvals(index, req.Evals) + n.handleUpsertedEvals(req.Evals) + return nil } // handleJobDeregister is used to deregister a job. -func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool) error { +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error { // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { n.logger.Error("periodicDispatcher.Remove failed", "error", err) @@ -537,7 +553,7 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu } if purge { - if err := n.state.DeleteJob(index, namespace, jobID); err != nil { + if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil { n.logger.Error("DeleteJob failed", "error", err) return err } @@ -545,11 +561,11 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu // We always delete from the periodic launch table because it is possible that // the job was updated to be non-periodic, thus checking if it is periodic // doesn't ensure we clean it up properly. - n.state.DeletePeriodicLaunch(index, namespace, jobID) + n.state.DeletePeriodicLaunchTxn(index, namespace, jobID, tx) } else { // Get the current job and mark it as stopped and re-insert it. ws := memdb.NewWatchSet() - current, err := n.state.JobByID(ws, namespace, jobID) + current, err := n.state.JobByIDTxn(ws, namespace, jobID, tx) if err != nil { n.logger.Error("JobByID lookup failed", "error", err) return err @@ -562,7 +578,7 @@ func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, pu stopped := current.Copy() stopped.Stop = true - if err := n.state.UpsertJob(index, stopped); err != nil { + if err := n.state.UpsertJobTxn(index, stopped, tx); err != nil { n.logger.Error("UpsertJob failed", "error", err) return err } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c2fcc4162fe..1b953fc64c6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -14,6 +14,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +type Txn = *memdb.Txn + const ( // NodeRegisterEventReregistered is the message used when the node becomes // reregistered. @@ -923,6 +925,10 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return nil } +func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error { + return s.upsertJobImpl(index, job, false, txn) +} + // upsertJobImpl is the implementation for registering a job or updating a job definition func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error { // COMPAT 0.7: Upgrade old objects that do not have namespaces @@ -1006,6 +1012,14 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { txn := s.db.Txn(true) defer txn.Abort() + err := s.DeleteJobTxn(index, namespace, jobID, txn) + if err == nil { + txn.Commit() + } + return err +} + +func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { namespace = structs.DefaultNamespace @@ -1092,7 +1106,6 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { return fmt.Errorf("index update failed: %v", err) } - txn.Commit() return nil } @@ -1190,7 +1203,10 @@ func (s *StateStore) upsertJobVersion(index uint64, job *structs.Job, txn *memdb // version. func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error) { txn := s.db.Txn(false) + return s.JobByIDTxn(ws, namespace, id, txn) +} +func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { namespace = structs.DefaultNamespace @@ -1511,6 +1527,14 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) txn := s.db.Txn(true) defer txn.Abort() + err := s.DeletePeriodicLaunchTxn(index, namespace, jobID, txn) + if err == nil { + txn.Commit() + } + return err +} + +func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { namespace = structs.DefaultNamespace @@ -1533,7 +1557,6 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) return fmt.Errorf("index update failed: %v", err) } - txn.Commit() return nil } @@ -1580,6 +1603,14 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro txn := s.db.Txn(true) defer txn.Abort() + err := s.UpsertEvalsTxn(index, evals, txn) + if err == nil { + txn.Commit() + } + return err +} + +func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error { // Do a nested upsert jobs := make(map[structs.NamespacedID]string, len(evals)) for _, eval := range evals { @@ -1599,7 +1630,6 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro return fmt.Errorf("setting job status failed: %v", err) } - txn.Commit() return nil } @@ -3889,6 +3919,17 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon return nil } +func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error { + tx := s.db.Txn(true) + defer tx.Abort() + + err := fn(tx) + if err == nil { + tx.Commit() + } + return err +} + // SchedulerCASConfig is used to update the scheduler configuration with a // given Raft index. If the CAS index specified is not equal to the last observed index // for the config, then the call is a noop. From 9405473329a83c91a1cf5c5bcf2ae47db0111007 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Mon, 12 Nov 2018 16:04:27 -0500 Subject: [PATCH 2/2] Comment public functions and batch write txn --- nomad/fsm.go | 4 ++++ nomad/state/state_store.go | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/nomad/fsm.go b/nomad/fsm.go index 722801134fc..fa9d54aca7c 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -520,6 +520,9 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} panic(fmt.Errorf("failed to decode request: %v", err)) } + // Perform all store updates atomically to ensure a consistent views for store readers. + // A partial update may increment the snapshot index, allowing eval brokers to process + // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(func(tx state.Txn) error { for jobNS, options := range req.Jobs { if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { @@ -540,6 +543,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} return err } + // perform the side effects outside the transactions n.handleUpsertedEvals(req.Evals) return nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1b953fc64c6..191a2ea3b80 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -14,6 +14,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// Txn is a transaction against a state store. +// This can be a read or write transaction. type Txn = *memdb.Txn const ( @@ -925,6 +927,8 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return nil } +// UpsertJobTxn is used to register a job or update a job definition, like UpsertJob, +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) UpsertJobTxn(index uint64, job *structs.Job, txn Txn) error { return s.upsertJobImpl(index, job, false, txn) } @@ -1019,6 +1023,8 @@ func (s *StateStore) DeleteJob(index uint64, namespace, jobID string) error { return err } +// DeleteJobTxn is used to deregister a job, like DeleteJob, +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) DeleteJobTxn(index uint64, namespace, jobID string, txn Txn) error { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { @@ -1206,6 +1212,8 @@ func (s *StateStore) JobByID(ws memdb.WatchSet, namespace, id string) (*structs. return s.JobByIDTxn(ws, namespace, id, txn) } +// JobByIDTxn is used to lookup a job by its ID, like JobByID. JobByID returns the job version +// accessable through in the transaction func (s *StateStore) JobByIDTxn(ws memdb.WatchSet, namespace, id string, txn Txn) (*structs.Job, error) { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { @@ -1534,6 +1542,8 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, namespace, jobID string) return err } +// DeletePeriodicLaunchTxn is used to delete the periodic launch, like DeletePeriodicLaunch +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) DeletePeriodicLaunchTxn(index uint64, namespace, jobID string, txn Txn) error { // COMPAT 0.7: Upgrade old objects that do not have namespaces if namespace == "" { @@ -1610,6 +1620,8 @@ func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) erro return err } +// UpsertEvals is used to upsert a set of evaluations, like UpsertEvals +// but in a transcation. Useful for when making multiple modifications atomically func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, txn Txn) error { // Do a nested upsert jobs := make(map[structs.NamespacedID]string, len(evals)) @@ -3919,6 +3931,9 @@ func (s *StateStore) SchedulerSetConfig(idx uint64, config *structs.SchedulerCon return nil } +// WithWriteTransaction executes the passed function within a write transaction, +// and returns its result. If the invocation returns no error, the transaction +// is committed; otherwise, it's aborted. func (s *StateStore) WithWriteTransaction(fn func(Txn) error) error { tx := s.db.Txn(true) defer tx.Abort()