From 29a077bdbe287c796775ead254506c6b6c794a6b Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 4 Aug 2016 08:06:19 -0700 Subject: [PATCH 1/5] etcdserver: always recover lessor first --- etcdserver/server.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 4da708f6d32..41c82944b79 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -660,6 +660,12 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { newbe := backend.NewDefaultBackend(fn) + if s.lessor != nil { + plog.Info("recovering lessor...") + s.lessor.Recover(newbe, s.kv) + plog.Info("finished recovering lessor") + } + plog.Info("restoring mvcc store...") if err := s.kv.Restore(newbe); err != nil { @@ -686,12 +692,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { s.be = newbe s.bemu.Unlock() - if s.lessor != nil { - plog.Info("recovering lessor...") - s.lessor.Recover(newbe, s.kv) - plog.Info("finished recovering lessor") - } - plog.Info("recovering alarms...") if err := s.restoreAlarms(); err != nil { plog.Panicf("restore alarms error: %v", err) From 4d59b6f52cbc656d8ad58cd0a291b5d400c18630 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 4 Aug 2016 08:06:33 -0700 Subject: [PATCH 2/5] lease: delete kvs in a txn --- lease/lessor.go | 20 +++++++++++++++++--- lease/lessor_test.go | 12 ++++++++++-- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index 45c8c252012..b2775017762 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -43,13 +43,18 @@ var ( type LeaseID int64 -// RangeDeleter defines an interface with DeleteRange method. +// RangeDeleter defines an interface with Txn and DeleteRange method. // We define this interface only for lessor to limit the number // of methods of mvcc.KV to what lessor actually needs. // // Having a minimum interface makes testing easy. type RangeDeleter interface { - DeleteRange(key, end []byte) (int64, int64) + // TxnBegin see comments on mvcc.KV + TxnBegin() int64 + // TxnEnd see comments on mvcc.KV + TxnEnd(txnID int64) error + // TxnDeleteRange see comments on mvcc.KV + TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) } // Lessor owns leases. It can grant, revoke, renew and modify leases for lessee. @@ -219,8 +224,17 @@ func (le *lessor) Revoke(id LeaseID) error { le.mu.Unlock() if le.rd != nil { + tid := le.rd.TxnBegin() for item := range l.itemSet { - le.rd.DeleteRange([]byte(item.Key), nil) + _, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil) + if err != nil { + panic(err) + } + } + + err := le.rd.TxnEnd(tid) + if err != nil { + panic(err) } } diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 946e8c03b69..adc682c8174 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -225,9 +225,17 @@ type fakeDeleter struct { deleted []string } -func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) { +func (fd *fakeDeleter) TxnBegin() int64 { + return 0 +} + +func (fd *fakeDeleter) TxnEnd(txnID int64) error { + return nil +} + +func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) { fd.deleted = append(fd.deleted, string(key)+"_"+string(end)) - return 0, 0 + return 0, 0, nil } func NewTestBackend(t *testing.T) (string, backend.Backend) { From 75c06cacae6a7026b14ee24179ceb56c0cbb1029 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 4 Aug 2016 08:35:15 -0700 Subject: [PATCH 3/5] lease: do lease delection in the kv txn --- lease/lessor.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lease/lessor.go b/lease/lessor.go index b2775017762..a04d14981eb 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -232,17 +232,20 @@ func (le *lessor) Revoke(id LeaseID) error { } } + le.mu.Lock() + defer le.mu.Unlock() + delete(le.leaseMap, l.ID) + // lease deletion needs to be in the same backend transcation with the + // kv deletion. Or we might end up with not executing the revoke or not + // deleting the keys if etcdserver fails in between. + l.removeFrom(le.b) + err := le.rd.TxnEnd(tid) if err != nil { panic(err) } } - le.mu.Lock() - defer le.mu.Unlock() - delete(le.leaseMap, l.ID) - l.removeFrom(le.b) - return nil } @@ -470,9 +473,7 @@ func (l Lease) persistTo(b backend.Backend) { func (l Lease) removeFrom(b backend.Backend) { key := int64ToBytes(int64(l.ID)) - b.BatchTx().Lock() b.BatchTx().UnsafeDelete(leaseBucketName, key) - b.BatchTx().Unlock() } // refresh refreshes the expiry of the lease. From bd62b0a6463ff480c25f26d4c81efe8a514bebed Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 4 Aug 2016 11:17:56 -0700 Subject: [PATCH 4/5] mvcc: attach keys to leases after recover all state The previous logic is wrong. When we have hisotry like Put(foo, bar, lease1), and Put(foo, bar, lease2), we will end up with attaching foo to two leases 1 and 2. Similar things can happen for deattach by clearing the lease of a key. Now we try to fix this by starting to attach leases at the end of the recovery. We use a map to keep the last lease attachment state. --- mvcc/kvstore.go | 41 +++++++++++++++++++++-------------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/mvcc/kvstore.go b/mvcc/kvstore.go index d1c71fd7c1a..249224221dc 100644 --- a/mvcc/kvstore.go +++ b/mvcc/kvstore.go @@ -367,6 +367,8 @@ func (s *store) restore() error { revToBytes(revision{main: 1}, min) revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max) + keyToLease := make(map[string]lease.LeaseID) + // restore index tx := s.b.BatchTx() tx.Lock() @@ -390,26 +392,15 @@ func (s *store) restore() error { switch { case isTombstone(key): s.kvindex.Tombstone(kv.Key, rev) - if lease.LeaseID(kv.Lease) != lease.NoLease { - err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) - if err != nil && err != lease.ErrLeaseNotFound { - plog.Fatalf("unexpected Detach error %v", err) - } - } + delete(keyToLease, string(kv.Key)) + default: s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version) - if lease.LeaseID(kv.Lease) != lease.NoLease { - if s.le == nil { - panic("no lessor to attach lease") - } - err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) - // We are walking through the kv history here. It is possible that we attached a key to - // the lease and the lease was revoked later. - // Thus attaching an old version of key to a none existing lease is possible here, and - // we should just ignore the error. - if err != nil && err != lease.ErrLeaseNotFound { - panic("unexpected Attach error") - } + + if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease { + keyToLease[string(kv.Key)] = lid + } else { + delete(keyToLease, string(kv.Key)) } } @@ -417,6 +408,16 @@ func (s *store) restore() error { s.currentRev = rev } + for key, lid := range keyToLease { + if s.le == nil { + panic("no lessor to attach lease") + } + err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}}) + if err != nil { + plog.Errorf("unexpected Attach error: %v", err) + } + } + _, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0) scheduledCompact := int64(0) if len(scheduledCompactBytes) != 0 { @@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) { err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}}) if err != nil { - panic("unexpected error from lease detach") + plog.Errorf("unexpected error from lease detach: %v", err) } } @@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) { if lease.LeaseID(kv.Lease) != lease.NoLease { err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}}) if err != nil { - plog.Fatalf("cannot detach %v", err) + plog.Errorf("cannot detach %v", err) } } } From d69d43828946d9f9cdb7516198fec6c563be1131 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Thu, 4 Aug 2016 20:39:32 -0700 Subject: [PATCH 5/5] *: minor cleanup for lease --- etcdserver/server.go | 6 ++++++ lease/lessor.go | 42 +++++++++++++++++++----------------------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 41c82944b79..c0732b883fc 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -395,6 +395,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { srv.be = be minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond + + // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. + // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds()))) srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) if beExist { @@ -404,6 +407,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) { } } srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex()) + srv.authStore = auth.NewAuthStore(srv.be) if h := cfg.AutoCompactionRetention; h != 0 { srv.compactor = compactor.NewPeriodic(h, srv.kv, srv) @@ -660,6 +664,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { newbe := backend.NewDefaultBackend(fn) + // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. + // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. if s.lessor != nil { plog.Info("recovering lessor...") s.lessor.Recover(newbe, s.kv) diff --git a/lease/lessor.go b/lease/lessor.go index a04d14981eb..682d7376b76 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -223,29 +223,31 @@ func (le *lessor) Revoke(id LeaseID) error { // unlock before doing external work le.mu.Unlock() - if le.rd != nil { - tid := le.rd.TxnBegin() - for item := range l.itemSet { - _, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil) - if err != nil { - panic(err) - } - } + if le.rd == nil { + return nil + } - le.mu.Lock() - defer le.mu.Unlock() - delete(le.leaseMap, l.ID) - // lease deletion needs to be in the same backend transcation with the - // kv deletion. Or we might end up with not executing the revoke or not - // deleting the keys if etcdserver fails in between. - l.removeFrom(le.b) - - err := le.rd.TxnEnd(tid) + tid := le.rd.TxnBegin() + for item := range l.itemSet { + _, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil) if err != nil { panic(err) } } + le.mu.Lock() + defer le.mu.Unlock() + delete(le.leaseMap, l.ID) + // lease deletion needs to be in the same backend transaction with the + // kv deletion. Or we might end up with not executing the revoke or not + // deleting the keys if etcdserver fails in between. + le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) + + err := le.rd.TxnEnd(tid) + if err != nil { + panic(err) + } + return nil } @@ -470,12 +472,6 @@ func (l Lease) persistTo(b backend.Backend) { b.BatchTx().Unlock() } -func (l Lease) removeFrom(b backend.Backend) { - key := int64ToBytes(int64(l.ID)) - - b.BatchTx().UnsafeDelete(leaseBucketName, key) -} - // refresh refreshes the expiry of the lease. func (l *Lease) refresh(extend time.Duration) { l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL))