Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recover lessor before recovering mvcc store and transactionally revoke leases #6098

Merged
merged 5 commits into from
Aug 5, 2016
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,12 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {

newbe := backend.NewDefaultBackend(fn)

if s.lessor != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

short comment about why lessor is recovered before kv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}

plog.Info("restoring mvcc store...")

if err := s.kv.Restore(newbe); err != nil {
Expand All @@ -686,12 +692,6 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
s.be = newbe
s.bemu.Unlock()

if s.lessor != nil {
plog.Info("recovering lessor...")
s.lessor.Recover(newbe, s.kv)
plog.Info("finished recovering lessor")
}

plog.Info("recovering alarms...")
if err := s.restoreAlarms(); err != nil {
plog.Panicf("restore alarms error: %v", err)
Expand Down
35 changes: 25 additions & 10 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ var (

type LeaseID int64

// RangeDeleter defines an interface with DeleteRange method.
// RangeDeleter defines an interface with Txn and DeleteRange method.
// We define this interface only for lessor to limit the number
// of methods of mvcc.KV to what lessor actually needs.
//
// Having a minimum interface makes testing easy.
type RangeDeleter interface {
DeleteRange(key, end []byte) (int64, int64)
// TxnBegin see comments on mvcc.KV
TxnBegin() int64
// TxnEnd see comments on mvcc.KV
TxnEnd(txnID int64) error
// TxnDeleteRange see comments on mvcc.KV
TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error)
}

// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
Expand Down Expand Up @@ -219,15 +224,27 @@ func (le *lessor) Revoke(id LeaseID) error {
le.mu.Unlock()

if le.rd != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if le.rd == nil { return nil } to pull back the indent for the interesting path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

tid := le.rd.TxnBegin()
for item := range l.itemSet {
le.rd.DeleteRange([]byte(item.Key), nil)
_, _, err := le.rd.TxnDeleteRange(tid, []byte(item.Key), nil)
if err != nil {
panic(err)
}
}
}

le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
l.removeFrom(le.b)
le.mu.Lock()
defer le.mu.Unlock()
delete(le.leaseMap, l.ID)
// lease deletion needs to be in the same backend transcation with the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/transcation/transaction

// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
l.removeFrom(le.b)

err := le.rd.TxnEnd(tid)
if err != nil {
panic(err)
}
}

return nil
}
Expand Down Expand Up @@ -456,9 +473,7 @@ func (l Lease) persistTo(b backend.Backend) {
func (l Lease) removeFrom(b backend.Backend) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems simple enough to inline now since it's only used in one place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure.

key := int64ToBytes(int64(l.ID))

b.BatchTx().Lock()
b.BatchTx().UnsafeDelete(leaseBucketName, key)
b.BatchTx().Unlock()
}

// refresh refreshes the expiry of the lease.
Expand Down
12 changes: 10 additions & 2 deletions lease/lessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,17 @@ type fakeDeleter struct {
deleted []string
}

func (fd *fakeDeleter) DeleteRange(key, end []byte) (int64, int64) {
func (fd *fakeDeleter) TxnBegin() int64 {
return 0
}

func (fd *fakeDeleter) TxnEnd(txnID int64) error {
return nil
}

func (fd *fakeDeleter) TxnDeleteRange(tid int64, key, end []byte) (int64, int64, error) {
fd.deleted = append(fd.deleted, string(key)+"_"+string(end))
return 0, 0
return 0, 0, nil
}

func NewTestBackend(t *testing.T) (string, backend.Backend) {
Expand Down
41 changes: 21 additions & 20 deletions mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ func (s *store) restore() error {
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

keyToLease := make(map[string]lease.LeaseID)

// restore index
tx := s.b.BatchTx()
tx.Lock()
Expand All @@ -390,33 +392,32 @@ func (s *store) restore() error {
switch {
case isTombstone(key):
s.kvindex.Tombstone(kv.Key, rev)
if lease.LeaseID(kv.Lease) != lease.NoLease {
err := s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil && err != lease.ErrLeaseNotFound {
plog.Fatalf("unexpected Detach error %v", err)
}
}
delete(keyToLease, string(kv.Key))

default:
s.kvindex.Restore(kv.Key, revision{kv.CreateRevision, 0}, rev, kv.Version)
if lease.LeaseID(kv.Lease) != lease.NoLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
// We are walking through the kv history here. It is possible that we attached a key to
// the lease and the lease was revoked later.
// Thus attaching an old version of key to a none existing lease is possible here, and
// we should just ignore the error.
if err != nil && err != lease.ErrLeaseNotFound {
panic("unexpected Attach error")
}

if lid := lease.LeaseID(kv.Lease); lid != lease.NoLease {
keyToLease[string(kv.Key)] = lid
} else {
delete(keyToLease, string(kv.Key))
}
}

// update revision
s.currentRev = rev
}

for key, lid := range keyToLease {
if s.le == nil {
panic("no lessor to attach lease")
}
err := s.le.Attach(lid, []lease.LeaseItem{{Key: key}})
if err != nil {
plog.Errorf("unexpected Attach error: %v", err)
}
}

_, scheduledCompactBytes := tx.UnsafeRange(metaBucketName, scheduledCompactKeyName, nil, 0)
scheduledCompact := int64(0)
if len(scheduledCompactBytes) != 0 {
Expand Down Expand Up @@ -550,7 +551,7 @@ func (s *store) put(key, value []byte, leaseID lease.LeaseID) {

err = s.le.Detach(oldLease, []lease.LeaseItem{{Key: string(key)}})
if err != nil {
panic("unexpected error from lease detach")
plog.Errorf("unexpected error from lease detach: %v", err)
}
}

Expand Down Expand Up @@ -619,7 +620,7 @@ func (s *store) delete(key []byte, rev revision) {
if lease.LeaseID(kv.Lease) != lease.NoLease {
err = s.le.Detach(lease.LeaseID(kv.Lease), []lease.LeaseItem{{Key: string(kv.Key)}})
if err != nil {
plog.Fatalf("cannot detach %v", err)
plog.Errorf("cannot detach %v", err)
}
}
}
Expand Down