Skip to content

Commit

Permalink
Merge pull request #6098 from xiang90/lease
Browse files Browse the repository at this point in the history
Fix Lease
  • Loading branch information
xiang90 authored Aug 5, 2016
2 parents 6c3efde + d69d438 commit 4a7fabd
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 42 deletions.
18 changes: 12 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {

srv.be = be
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
if beExist {
Expand All @@ -404,6 +407,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
}
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())

srv.authStore = auth.NewAuthStore(srv.be)
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
Expand Down Expand Up @@ -660,6 +664,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {

newbe := backend.NewDefaultBackend(fn)

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}

plog.Info("restoring mvcc store...")

if err := s.kv.Restore(newbe); err != nil {
Expand All @@ -686,12 +698,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
s.be = newbe
s.bemu.Unlock()

if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}

plog.Info("recovering alarms...")
if err := s.restoreAlarms(); err != nil {
plog.Panicf("restore alarms error: %v", err)
Expand Down
39 changes: 25 additions & 14 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ var (

type LeaseID int64

// RangeDeleter defines an interface with DeleteRange method.
// RangeDeleter defines an interface with Txn and DeleteRange method.
// We define this interface only for lessor to limit the number
// of methods of mvcc.KV to what lessor actually needs.
//
// Having a minimum interface makes testing easy.
type RangeDeleter interface {
DeleteRange(key, end []byte) (int64, int64)
// TxnBegin see comments on mvcc.KV
TxnBegin() int64
// TxnEnd see comments on mvcc.KV
TxnEnd(txnID int64) error
// TxnDeleteRange see comments on mvcc.KV
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
}

// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
Expand Down Expand Up @@ -218,16 +223,30 @@ func (le *lessor) Revoke(id LeaseID) error {
// unlock before doing external work
le.mu.Unlock()

if le.rd != nil {
for item := range l.itemSet {
le.rd.DeleteRange([]byte(item.Key), nil)
if le.rd == nil {
return nil
}

tid := le.rd.TxnBegin()
for item := range l.itemSet {
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
if err != nil {
panic(err)
}
}

le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
l.removeFrom(le.b)
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))

err := le.rd.TxnEnd(tid)
if err != nil {
panic(err)
}

return nil
}
Expand Down Expand Up @@ -453,14 +472,6 @@ func (l Lease) persistTo(b backend.Backend) {
b.BatchTx().Unlock()
}

func (l Lease) removeFrom(b backend.Backend) {
key := int64ToBytes(int64(l.ID))

b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(leaseBucketName, key)
b.BatchTx().Unlock()
}

// refresh refreshes the expiry of the lease.
func (l *Lease) refresh(extend time.Duration) {
l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL))
Expand Down
12 changes: 10 additions & 2 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,17 @@ type fakeDeleter struct {
deleted []string
}

func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
func (fd *fakeDeleter) TxnBegin() int64 {
return 0
}

func (fd *fakeDeleter) TxnEnd(txnID int64) error {
return nil
}

func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
return 0, 0
return 0, 0, nil
}

func NewTestBackend(t *testing.T) (string, backend.Backend) {
Expand Down
41 changes: 21 additions & 20 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ func (s *store) restore() error {
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

keyToLease := make(map[string]lease.LeaseID)

// restore index
tx := s.b.BatchTx()
tx.Lock()
Expand All @@ -390,33 +392,32 @@ func (s *store) restore() error {
switch {
case isTombstone(key):
s.kvindex.Tombstone(kv.Key, rev)
if lease.LeaseID(kv.Lease) != lease.NoLease {
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil && err != lease.ErrLeaseNotFound {
plog.Fatalf("unexpected Detach error %v", err)
}
}
delete(keyToLease, string(kv.Key))

default:
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
if lease.LeaseID(kv.Lease) != lease.NoLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
// We are walking through the kv history here. It is possible that we attached a key to
// the lease and the lease was revoked later.
// Thus attaching an old version of key to a none existing lease is possible here, and
// we should just ignore the error.
if err != nil && err != lease.ErrLeaseNotFound {
panic("unexpected Attach error")
}

if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[string(kv.Key)] = lid
} else {
delete(keyToLease, string(kv.Key))
}
}

// update revision
s.currentRev = rev
}

for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
plog.Errorf("unexpected Attach error: %v", err)
}
}

_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
Expand Down Expand Up @@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {

err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease detach")
plog.Errorf("unexpected error from lease detach: %v", err)
}
}

Expand Down Expand Up @@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
if lease.LeaseID(kv.Lease) != lease.NoLease {
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil {
plog.Fatalf("cannot detach %v", err)
plog.Errorf("cannot detach %v", err)
}
}
}
Expand Down

0 comments on commit 4a7fabd

Please sign in to comment.