Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recover lessor before recovering mvcc store and transactionally revoke leases #6098

Merged
merged 5 commits into from
Aug 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {

srv.be = be
minTTL := time.Duration((3*cfg.ElectionTicks)/2) * time.Duration(cfg.TickMs) * time.Millisecond

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
if beExist {
Expand All @@ -404,6 +407,7 @@ func NewServer(cfg *ServerConfig) (srv *EtcdServer, err error) {
}
}
srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())

srv.authStore = auth.NewAuthStore(srv.be)
if h := cfg.AutoCompactionRetention; h != 0 {
srv.compactor = compactor.NewPeriodic(h, srv.kv, srv)
Expand Down Expand Up @@ -660,6 +664,14 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {

newbe := backend.NewDefaultBackend(fn)

// always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
// If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
if s.lessor != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

short comment about why lessor is recovered before kv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}

plog.Info("restoring mvcc store...")

if err := s.kv.Restore(newbe); err != nil {
Expand All @@ -686,12 +698,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
s.be = newbe
s.bemu.Unlock()

if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}

plog.Info("recovering alarms...")
if err := s.restoreAlarms(); err != nil {
plog.Panicf("restore alarms error: %v", err)
Expand Down
39 changes: 25 additions & 14 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ var (

type LeaseID int64

// RangeDeleter defines an interface with DeleteRange method.
// RangeDeleter defines an interface with Txn and DeleteRange method.
// We define this interface only for lessor to limit the number
// of methods of mvcc.KV to what lessor actually needs.
//
// Having a minimum interface makes testing easy.
type RangeDeleter interface {
DeleteRange(key, end []byte) (int64, int64)
// TxnBegin see comments on mvcc.KV
TxnBegin() int64
// TxnEnd see comments on mvcc.KV
TxnEnd(txnID int64) error
// TxnDeleteRange see comments on mvcc.KV
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
}

// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
Expand Down Expand Up @@ -218,16 +223,30 @@ func (le *lessor) Revoke(id LeaseID) error {
// unlock before doing external work
le.mu.Unlock()

if le.rd != nil {
for item := range l.itemSet {
le.rd.DeleteRange([]byte(item.Key), nil)
if le.rd == nil {
return nil
}

tid := le.rd.TxnBegin()
for item := range l.itemSet {
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
if err != nil {
panic(err)
}
}

le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
l.removeFrom(le.b)
// lease deletion needs to be in the same backend transaction with the
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))

err := le.rd.TxnEnd(tid)
if err != nil {
panic(err)
}

return nil
}
Expand Down Expand Up @@ -453,14 +472,6 @@ func (l Lease) persistTo(b backend.Backend) {
b.BatchTx().Unlock()
}

func (l Lease) removeFrom(b backend.Backend) {
key := int64ToBytes(int64(l.ID))

b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(leaseBucketName, key)
b.BatchTx().Unlock()
}

// refresh refreshes the expiry of the lease.
func (l *Lease) refresh(extend time.Duration) {
l.expiry = time.Now().Add(extend + time.Second*time.Duration(l.TTL))
Expand Down
12 changes: 10 additions & 2 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,17 @@ type fakeDeleter struct {
deleted []string
}

func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
func (fd *fakeDeleter) TxnBegin() int64 {
return 0
}

func (fd *fakeDeleter) TxnEnd(txnID int64) error {
return nil
}

func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
return 0, 0
return 0, 0, nil
}

func NewTestBackend(t *testing.T) (string, backend.Backend) {
Expand Down
41 changes: 21 additions & 20 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ func (s *store) restore() error {
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

keyToLease := make(map[string]lease.LeaseID)

// restore index
tx := s.b.BatchTx()
tx.Lock()
Expand All @@ -390,33 +392,32 @@ func (s *store) restore() error {
switch {
case isTombstone(key):
s.kvindex.Tombstone(kv.Key, rev)
if lease.LeaseID(kv.Lease) != lease.NoLease {
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil && err != lease.ErrLeaseNotFound {
plog.Fatalf("unexpected Detach error %v", err)
}
}
delete(keyToLease, string(kv.Key))

default:
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
if lease.LeaseID(kv.Lease) != lease.NoLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
// We are walking through the kv history here. It is possible that we attached a key to
// the lease and the lease was revoked later.
// Thus attaching an old version of key to a none existing lease is possible here, and
// we should just ignore the error.
if err != nil && err != lease.ErrLeaseNotFound {
panic("unexpected Attach error")
}

if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[string(kv.Key)] = lid
} else {
delete(keyToLease, string(kv.Key))
}
}

// update revision
s.currentRev = rev
}

for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
plog.Errorf("unexpected Attach error: %v", err)
}
}

_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
Expand Down Expand Up @@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {

err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease detach")
plog.Errorf("unexpected error from lease detach: %v", err)
}
}

Expand Down Expand Up @@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
if lease.LeaseID(kv.Lease) != lease.NoLease {
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil {
plog.Fatalf("cannot detach %v", err)
plog.Errorf("cannot detach %v", err)
}
}
}
Expand Down