Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lease: Add a heap to optimize lease expiration checks #9418

Merged
merged 4 commits into from
Apr 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.4.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ See [code changes](https://github.com/coreos/etcd/compare/v3.3.0...v3.4.0) and [
- The retention window of compaction period moves for every given compaction period or hour.
- For instance, when hourly writes are 100 and `--auto-compaction-mode=periodic --auto-compaction-retention=24h`, `v3.2.x`, `v3.3.0`, `v3.3.1`, and `v3.3.2` compact revision 2400, 2640, and 2880 for every 2.4-hour, while `v3.3.3` *or later* compacts revision 2400, 2500, 2600 for every 1-hour.
- Futhermore, when `--auto-compaction-mode=periodic --auto-compaction-retention=30m` and writes per minute are about 1000, `v3.3.0`, `v3.3.1`, and `v3.3.2` compact revision 30000, 33000, and 36000, for every 3-minute, while `v3.3.3` *or later* compacts revision 30000, 60000, and 90000, for every 30-minute.
- Improve [lease expire/revoke operation performance](https://github.com/coreos/etcd/pull/9418), address [lease scalability issue](https://github.com/coreos/etcd/issues/9496).
- Make [Lease `Lookup` non-blocking with concurrent `Grant`/`Revoke`](https://github.com/coreos/etcd/pull/9229).

### Breaking Changes
Expand Down
51 changes: 51 additions & 0 deletions lease/lease_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lease

type LeaseWithTime struct {
leaseId LeaseID
expiration int64
index int
}

type LeaseQueue []*LeaseWithTime

func (pq LeaseQueue) Len() int { return len(pq) }

func (pq LeaseQueue) Less(i, j int) bool {
return pq[i].expiration < pq[j].expiration
}

func (pq LeaseQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *LeaseQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*LeaseWithTime)
item.index = n
*pq = append(*pq, item)
}

func (pq *LeaseQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
38 changes: 32 additions & 6 deletions lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package lease

import (
"container/heap"
"encoding/binary"
"errors"
"math"
Expand Down Expand Up @@ -128,9 +129,9 @@ type lessor struct {
// We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
// Renew O(1).
// findExpiredLeases and Renew should be the most frequent operations.
leaseMap map[LeaseID]*Lease

itemMap map[LeaseItem]LeaseID
leaseMap map[LeaseID]*Lease
leaseHeap LeaseQueue
itemMap map[LeaseItem]LeaseID

// When a lease expires, the lessor will delete the
// leased range (or key) by the RangeDeleter.
Expand Down Expand Up @@ -159,6 +160,7 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
l := &lessor{
leaseMap: make(map[LeaseID]*Lease),
itemMap: make(map[LeaseItem]LeaseID),
leaseHeap: make(LeaseQueue, 0),
b: b,
minLeaseTTL: minLeaseTTL,
// expiredC is a small buffered chan to avoid unnecessary blocking.
Expand Down Expand Up @@ -233,6 +235,8 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
}

le.leaseMap[id] = l
item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
l.persistTo(le.b)

return l, nil
Expand Down Expand Up @@ -315,6 +319,8 @@ func (le *lessor) Renew(id LeaseID) (int64, error) {
}

l.refresh(0)
item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
return l.ttl, nil
}

Expand Down Expand Up @@ -349,6 +355,8 @@ func (le *lessor) Promote(extend time.Duration) {
// refresh the expiries of all leases.
for _, l := range le.leaseMap {
l.refresh(extend)
item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}

if len(le.leaseMap) < leaseRevokeRate {
Expand Down Expand Up @@ -384,6 +392,8 @@ func (le *lessor) Promote(extend time.Duration) {
delay := time.Duration(rateDelay)
nextWindow = baseWindow + delay
l.refresh(delay + extend)
item := &LeaseWithTime{leaseId: l.ID, expiration: l.expiry.UnixNano()}
heap.Push(&le.leaseHeap, item)
}
}

Expand Down Expand Up @@ -516,9 +526,24 @@ func (le *lessor) runLoop() {
func (le *lessor) findExpiredLeases(limit int) []*Lease {
leases := make([]*Lease, 0, 16)

for _, l := range le.leaseMap {
// TODO: probably should change to <= 100-500 millisecond to
// make up committing latency.
for {
if le.leaseHeap.Len() == 0 {
break
}

item := heap.Pop(&le.leaseHeap).(*LeaseWithTime)
l := le.leaseMap[item.leaseId]
if l == nil {
// lease has expired or been revoked, continue
continue
}
if time.Now().UnixNano() < item.expiration {
// Candidate expirations are caught up, reinsert this item
heap.Push(&le.leaseHeap, item)
break
}
// if the lease is actually expired, add to the removal list. If it is not expired, we can ignore it because another entry will have been inserted into the heap

if l.expired() {
leases = append(leases, l)

Expand Down Expand Up @@ -560,6 +585,7 @@ func (le *lessor) initAndRecover() {
revokec: make(chan struct{}),
}
}
heap.Init(&le.leaseHeap)
tx.Unlock()

le.b.ForceCommit()
Expand Down
121 changes: 121 additions & 0 deletions lease/lessor_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2018 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lease

import (
"os"
"testing"

"github.com/coreos/etcd/mvcc/backend"
)

func BenchmarkLessorFindExpired1(b *testing.B) { benchmarkLessorFindExpired(1, b) }
func BenchmarkLessorFindExpired10(b *testing.B) { benchmarkLessorFindExpired(10, b) }
func BenchmarkLessorFindExpired100(b *testing.B) { benchmarkLessorFindExpired(100, b) }
func BenchmarkLessorFindExpired1000(b *testing.B) { benchmarkLessorFindExpired(1000, b) }
func BenchmarkLessorFindExpired10000(b *testing.B) { benchmarkLessorFindExpired(10000, b) }
func BenchmarkLessorFindExpired100000(b *testing.B) { benchmarkLessorFindExpired(100000, b) }
func BenchmarkLessorFindExpired1000000(b *testing.B) { benchmarkLessorFindExpired(1000000, b) }

func BenchmarkLessorGrant1(b *testing.B) { benchmarkLessorGrant(1, b) }
func BenchmarkLessorGrant10(b *testing.B) { benchmarkLessorGrant(10, b) }
func BenchmarkLessorGrant100(b *testing.B) { benchmarkLessorGrant(100, b) }
func BenchmarkLessorGrant1000(b *testing.B) { benchmarkLessorGrant(1000, b) }
func BenchmarkLessorGrant10000(b *testing.B) { benchmarkLessorGrant(10000, b) }
func BenchmarkLessorGrant100000(b *testing.B) { benchmarkLessorGrant(100000, b) }
func BenchmarkLessorGrant1000000(b *testing.B) { benchmarkLessorGrant(1000000, b) }

func BenchmarkLessorRenew1(b *testing.B) { benchmarkLessorRenew(1, b) }
func BenchmarkLessorRenew10(b *testing.B) { benchmarkLessorRenew(10, b) }
func BenchmarkLessorRenew100(b *testing.B) { benchmarkLessorRenew(100, b) }
func BenchmarkLessorRenew1000(b *testing.B) { benchmarkLessorRenew(1000, b) }
func BenchmarkLessorRenew10000(b *testing.B) { benchmarkLessorRenew(10000, b) }
func BenchmarkLessorRenew100000(b *testing.B) { benchmarkLessorRenew(100000, b) }
func BenchmarkLessorRenew1000000(b *testing.B) { benchmarkLessorRenew(1000000, b) }

func BenchmarkLessorRevoke1(b *testing.B) { benchmarkLessorRevoke(1, b) }
func BenchmarkLessorRevoke10(b *testing.B) { benchmarkLessorRevoke(10, b) }
func BenchmarkLessorRevoke100(b *testing.B) { benchmarkLessorRevoke(100, b) }
func BenchmarkLessorRevoke1000(b *testing.B) { benchmarkLessorRevoke(1000, b) }
func BenchmarkLessorRevoke10000(b *testing.B) { benchmarkLessorRevoke(10000, b) }
func BenchmarkLessorRevoke100000(b *testing.B) { benchmarkLessorRevoke(100000, b) }
func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000, b) }

func benchmarkLessorFindExpired(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
defer le.Stop()
defer cleanup(be, tmpPath)
le.Promote(0)
for i := 0; i < size; i++ {
le.Grant(LeaseID(i), int64(100+i))
}
le.mu.Lock() //Stop the findExpiredLeases call in the runloop
defer le.mu.Unlock()
b.ResetTimer()
for i := 0; i < b.N; i++ {
le.findExpiredLeases(1000)
}
}

func benchmarkLessorGrant(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
le.Grant(LeaseID(i), int64(100+i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
le.Grant(LeaseID(i+size), int64(100+i+size))
}
}

func benchmarkLessorRevoke(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
le.Grant(LeaseID(i), int64(100+i))
}
for i := 0; i < b.N; i++ {
le.Grant(LeaseID(i+size), int64(100+i+size))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
le.Revoke(LeaseID(i + size))
}
}

func benchmarkLessorRenew(size int, b *testing.B) {
be, tmpPath := backend.NewDefaultTmpBackend()
le := newLessor(be, minLeaseTTL)
defer le.Stop()
defer cleanup(be, tmpPath)
for i := 0; i < size; i++ {
le.Grant(LeaseID(i), int64(100+i))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
le.Renew(LeaseID(i))
}
}

func cleanup(b backend.Backend, path string) {
b.Close()
os.Remove(path)
}