diff --git a/embed/config.go b/embed/config.go index edbb649b83c..4cddb53bf36 100644 --- a/embed/config.go +++ b/embed/config.go @@ -189,6 +189,9 @@ type Config struct { ExperimentalInitialCorruptCheck bool `json:"experimental-initial-corrupt-check"` ExperimentalCorruptCheckTime time.Duration `json:"experimental-corrupt-check-time"` ExperimentalEnableV2V3 string `json:"experimental-enable-v2v3"` + + PersistExpiry bool `json:"persist-expiry"` + } // configYAML holds the config suitable for yaml parsing @@ -247,6 +250,7 @@ func NewConfig() *Config { Metrics: "basic", EnableV2: DefaultEnableV2, AuthToken: "simple", + PersistExpiry: false, } cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) return cfg diff --git a/embed/etcd.go b/embed/etcd.go index ee3e6dd24fe..65bbfc2f2ad 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -173,6 +173,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { InitialCorruptCheck: cfg.ExperimentalInitialCorruptCheck, CorruptCheckTime: cfg.ExperimentalCorruptCheckTime, Debug: cfg.Debug, + PersistExpiry: cfg.PersistExpiry, } if e.Server, err = etcdserver.NewServer(srvcfg); err != nil { diff --git a/etcdctl/ctlv3/command/snapshot_command.go b/etcdctl/ctlv3/command/snapshot_command.go index 2929cd94e8c..59769823d9f 100644 --- a/etcdctl/ctlv3/command/snapshot_command.go +++ b/etcdctl/ctlv3/command/snapshot_command.go @@ -380,7 +380,7 @@ func makeDB(snapdir, dbfile string, commit int) { // having a new raft instance be := backend.NewDefaultBackend(dbpath) // a lessor never timeouts leases - lessor := lease.NewLessor(be, math.MaxInt64) + lessor := lease.NewLessor(be, math.MaxInt64, false) s := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) txn := s.Write() btx := be.BatchTx() diff --git a/etcdmain/config.go b/etcdmain/config.go index 31241de9c5a..911f032d5e2 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -216,6 +216,9 @@ func newConfig() *config { fs.BoolVar(&cfg.ec.ExperimentalInitialCorruptCheck, "experimental-initial-corrupt-check", cfg.ec.ExperimentalInitialCorruptCheck, "Enable to check data corruption before serving any client/peer traffic.") fs.DurationVar(&cfg.ec.ExperimentalCorruptCheckTime, "experimental-corrupt-check-time", cfg.ec.ExperimentalCorruptCheckTime, "Duration of time between cluster corruption check passes.") + fs.BoolVar(&cfg.ec.PersistExpiry, "persist-expiry", cfg.ec.PersistExpiry, "Persist expiry values for leases.") + + // ignored for _, f := range cfg.ignored { fs.Var(&flags.IgnoredFlag{Name: f}, f, "") diff --git a/etcdserver/config.go b/etcdserver/config.go index 056af745dad..00596dd8c5b 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -72,6 +72,8 @@ type ServerConfig struct { CorruptCheckTime time.Duration Debug bool + + PersistExpiry bool } // VerifyBootstrap sanity-checks the initial config for bootstrap case diff --git a/etcdserver/server.go b/etcdserver/server.go index 281dce91dac..29cf1933fbc 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -443,7 +443,7 @@ func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) { // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers. - srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds()))) + srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())), cfg.PersistExpiry) srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex) if beExist { kvindex := srv.kv.ConsistentIndex() diff --git a/lease/leasehttp/http_test.go b/lease/leasehttp/http_test.go index 367cd8e64a4..ad20b4a2f18 100644 --- a/lease/leasehttp/http_test.go +++ b/lease/leasehttp/http_test.go @@ -32,7 +32,7 @@ func TestRenewHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(be, int64(5)) + le := lease.NewLessor(be, int64(5), false) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -56,7 +56,7 @@ func TestTimeToLiveHTTP(t *testing.T) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(be, int64(5)) + le := lease.NewLessor(be, int64(5), false) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { @@ -97,7 +97,7 @@ func testApplyTimeout(t *testing.T, f func(*lease.Lease, string) error) { defer os.Remove(tmpPath) defer be.Close() - le := lease.NewLessor(be, int64(5)) + le := lease.NewLessor(be, int64(5), false) le.Promote(time.Second) l, err := le.Grant(1, int64(5)) if err != nil { diff --git a/lease/leasepb/lease.pb.go b/lease/leasepb/lease.pb.go index 4ab93767277..cc55ff81152 100644 --- a/lease/leasepb/lease.pb.go +++ b/lease/leasepb/lease.pb.go @@ -40,8 +40,9 @@ var _ = math.Inf const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package type Lease struct { - ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"` - TTL int64 `protobuf:"varint,2,opt,name=TTL,proto3" json:"TTL,omitempty"` + ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"` + TTL int64 `protobuf:"varint,2,opt,name=TTL,proto3" json:"TTL,omitempty"` + Expiry int64 `protobuf:"varint,3,opt,name=Expiry,proto3" json:"Expiry,omitempty"` } func (m *Lease) Reset() { *m = Lease{} } @@ -97,6 +98,11 @@ func (m *Lease) MarshalTo(dAtA []byte) (int, error) { i++ i = encodeVarintLease(dAtA, i, uint64(m.TTL)) } + if m.Expiry != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintLease(dAtA, i, uint64(m.Expiry)) + } return i, nil } @@ -174,6 +180,9 @@ func (m *Lease) Size() (n int) { if m.TTL != 0 { n += 1 + sovLease(uint64(m.TTL)) } + if m.Expiry != 0 { + n += 1 + sovLease(uint64(m.Expiry)) + } return n } @@ -277,6 +286,25 @@ func (m *Lease) Unmarshal(dAtA []byte) error { break } } + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Expiry", wireType) + } + m.Expiry = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowLease + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Expiry |= (int64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipLease(dAtA[iNdEx:]) @@ -572,20 +600,21 @@ var ( func init() { proto.RegisterFile("lease.proto", fileDescriptorLease) } var fileDescriptorLease = []byte{ - // 233 bytes of a gzipped FileDescriptorProto + // 249 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x49, 0x4d, 0x2c, 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x07, 0x73, 0x0a, 0x92, 0xa4, 0x44, 0xd2, 0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x4a, 0x2d, 0xb5, 0x24, 0x39, 0x45, 0x1f, 0x44, 0x14, 0xa7, 0x16, 0x95, 0xa5, 0x16, 0x21, 0x31, 0x0b, 0x92, 0xf4, 0x8b, 0x0a, 0x92, - 0x21, 0xea, 0x94, 0x34, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48, + 0x21, 0xea, 0x94, 0x1c, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x31, 0x79, 0xba, 0x08, 0x09, 0x70, 0x31, 0x87, 0x84, 0xf8, - 0x48, 0x30, 0x81, 0x05, 0x40, 0x4c, 0xa5, 0x12, 0x2e, 0x11, 0xb0, 0x52, 0xcf, 0xbc, 0x92, 0xd4, - 0xa2, 0xbc, 0xc4, 0x9c, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0xa1, 0x18, 0x2e, 0x31, 0xb0, - 0x78, 0x48, 0x66, 0x6e, 0x6a, 0x48, 0xbe, 0x4f, 0x66, 0x59, 0x2a, 0x54, 0x06, 0x6c, 0x1a, 0xb7, - 0x91, 0x8a, 0x1e, 0xb2, 0xdd, 0x7a, 0xd8, 0xd5, 0x06, 0xe1, 0x30, 0x43, 0xa9, 0x82, 0x4b, 0x14, - 0xcd, 0xd6, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa1, 0x78, 0x2e, 0x71, 0x0c, 0x2d, 0x10, 0x29, - 0xa8, 0xbd, 0xaa, 0x04, 0xec, 0x85, 0x28, 0x0e, 0xc2, 0x65, 0x8a, 0x93, 0xc4, 0x89, 0x87, 0x72, - 0x0c, 0x17, 0x1e, 0xca, 0x31, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47, - 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0xc3, 0xce, 0x18, 0x10, 0x00, 0x00, 0xff, - 0xff, 0x9f, 0xf2, 0x42, 0xe0, 0x91, 0x01, 0x00, 0x00, + 0x48, 0x30, 0x81, 0x05, 0x40, 0x4c, 0x21, 0x31, 0x2e, 0x36, 0xd7, 0x8a, 0x82, 0xcc, 0xa2, 0x4a, + 0x09, 0x66, 0xb0, 0x20, 0x94, 0xa7, 0x54, 0xc2, 0x25, 0x02, 0x36, 0xc2, 0x33, 0xaf, 0x24, 0xb5, + 0x28, 0x2f, 0x31, 0x27, 0x28, 0xb5, 0xb0, 0x34, 0xb5, 0xb8, 0x44, 0x28, 0x86, 0x4b, 0x0c, 0x2c, + 0x1e, 0x92, 0x99, 0x9b, 0x1a, 0x92, 0xef, 0x93, 0x59, 0x96, 0x0a, 0x95, 0x01, 0xdb, 0xc2, 0x6d, + 0xa4, 0xa2, 0x87, 0xec, 0x26, 0x3d, 0xec, 0x6a, 0x83, 0x70, 0x98, 0xa1, 0x54, 0xc1, 0x25, 0x8a, + 0x66, 0x6b, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa, 0x50, 0x3c, 0x97, 0x38, 0x86, 0x16, 0x88, 0x14, + 0xd4, 0x5e, 0x55, 0x02, 0xf6, 0x42, 0x14, 0x07, 0xe1, 0x32, 0xc5, 0x49, 0xe2, 0xc4, 0x43, 0x39, + 0x86, 0x0b, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, + 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x61, 0x6a, 0x0c, 0x08, 0x00, 0x00, 0xff, + 0xff, 0x0c, 0xce, 0xf7, 0x50, 0xa9, 0x01, 0x00, 0x00, } diff --git a/lease/leasepb/lease.proto b/lease/leasepb/lease.proto index be414b993ed..1b83c39daec 100644 --- a/lease/leasepb/lease.proto +++ b/lease/leasepb/lease.proto @@ -13,6 +13,7 @@ option (gogoproto.goproto_enum_prefix_all) = false; message Lease { int64 ID = 1; int64 TTL = 2; + int64 Expiry = 3; } message LeaseInternalRequest { diff --git a/lease/lessor.go b/lease/lessor.go index 2b56a36e03f..5fc821f72be 100644 --- a/lease/lessor.go +++ b/lease/lessor.go @@ -150,13 +150,15 @@ type lessor struct { stopC chan struct{} // doneC is a channel whose closure indicates that the lessor is stopped. doneC chan struct{} + + persistExpiry bool } -func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor { - return newLessor(b, minLeaseTTL) +func NewLessor(b backend.Backend, minLeaseTTL int64, persistExpiry bool) Lessor { + return newLessor(b, minLeaseTTL, persistExpiry) } -func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { +func newLessor(b backend.Backend, minLeaseTTL int64, persistExpiry bool) *lessor { l := &lessor{ leaseMap: make(map[LeaseID]*Lease), itemMap: make(map[LeaseItem]LeaseID), @@ -167,7 +169,9 @@ func newLessor(b backend.Backend, minLeaseTTL int64) *lessor { expiredC: make(chan []*Lease, 16), stopC: make(chan struct{}), doneC: make(chan struct{}), + persistExpiry: persistExpiry, } + l.initAndRecover() go l.runLoop() @@ -228,7 +232,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { l.ttl = le.minLeaseTTL } - if le.isPrimary() { + if le.isPrimary() || le.persistExpiry { l.refresh(0) } else { l.forever() @@ -237,7 +241,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { le.leaseMap[id] = l item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} heap.Push(&le.leaseHeap, item) - l.persistTo(le.b) + l.persistTo(le.b, le.persistExpiry) return l, nil } @@ -321,6 +325,9 @@ func (le *lessor) Renew(id LeaseID) (int64, error) { l.refresh(0) item := &LeaseWithTime{id: l.ID, expiration: l.expiry.UnixNano()} heap.Push(&le.leaseHeap, item) + if le.persistExpiry { + l.persistTo(le.b, le.persistExpiry) + } return l.ttl, nil } @@ -351,7 +358,9 @@ func (le *lessor) Promote(extend time.Duration) { defer le.mu.Unlock() le.demotec = make(chan struct{}) - + if le.persistExpiry { + return + } // refresh the expiries of all leases. for _, l := range le.leaseMap { l.refresh(extend) @@ -407,9 +416,11 @@ func (le *lessor) Demote() { le.mu.Lock() defer le.mu.Unlock() - // set the expiries of all leases to forever - for _, l := range le.leaseMap { - l.forever() + if !le.persistExpiry { + // set the expiries of all leases to forever + for _, l := range le.leaseMap { + l.forever() + } } if le.demotec != nil { @@ -597,13 +608,19 @@ func (le *lessor) initAndRecover() { if lpb.TTL < le.minLeaseTTL { lpb.TTL = le.minLeaseTTL } + var expiry time.Time + if le.persistExpiry && lpb.Expiry != 0{ + expiry = time.Unix(0, lpb.Expiry) + }else{ + expiry = forever + } le.leaseMap[ID] = &Lease{ ID: ID, ttl: lpb.TTL, // itemSet will be filled in when recover key-value pairs // set expiry to forever, refresh when promoted itemSet: make(map[LeaseItem]struct{}), - expiry: forever, + expiry: expiry, revokec: make(chan struct{}), } } @@ -631,10 +648,14 @@ func (l *Lease) expired() bool { return l.Remaining() <= 0 } -func (l *Lease) persistTo(b backend.Backend) { +func (l *Lease) persistTo(b backend.Backend, expiry bool) { key := int64ToBytes(int64(l.ID)) - - lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)} + var lpb leasepb.Lease + if expiry{ + lpb = leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl), Expiry: l.expiry.UnixNano()} + }else{ + lpb = leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)} + } val, err := lpb.Marshal() if err != nil { panic("failed to marshal lease proto item") diff --git a/lease/lessor_bench_test.go b/lease/lessor_bench_test.go index 063c5519f29..e7092e36ade 100644 --- a/lease/lessor_bench_test.go +++ b/lease/lessor_bench_test.go @@ -54,7 +54,7 @@ func BenchmarkLessorRevoke1000000(b *testing.B) { benchmarkLessorRevoke(1000000 func benchmarkLessorFindExpired(size int, b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() defer cleanup(be, tmpPath) le.Promote(0) @@ -71,7 +71,7 @@ func benchmarkLessorFindExpired(size int, b *testing.B) { func benchmarkLessorGrant(size int, b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() defer cleanup(be, tmpPath) for i := 0; i < size; i++ { @@ -85,7 +85,7 @@ func benchmarkLessorGrant(size int, b *testing.B) { func benchmarkLessorRevoke(size int, b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() defer cleanup(be, tmpPath) for i := 0; i < size; i++ { @@ -102,7 +102,7 @@ func benchmarkLessorRevoke(size int, b *testing.B) { func benchmarkLessorRenew(size int, b *testing.B) { be, tmpPath := backend.NewDefaultTmpBackend() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() defer cleanup(be, tmpPath) for i := 0; i < size; i++ { diff --git a/lease/lessor_test.go b/lease/lessor_test.go index 3a39e846f72..766fdf46df8 100644 --- a/lease/lessor_test.go +++ b/lease/lessor_test.go @@ -41,7 +41,7 @@ func TestLessorGrant(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() le.Promote(0) @@ -95,6 +95,29 @@ func TestLessorGrant(t *testing.T) { be.BatchTx().Unlock() } +// TestLessorGrantPresistExpiry ensures Lessor can grant an existing lease persisting expiry. +func TestLessorGrantPersistExpiry(t *testing.T) { + dir, be := NewTestBackend(t) + defer be.Close() + defer os.RemoveAll(dir) + + le := newLessor(be, minLeaseTTL, true) + defer le.Stop() + le.Promote(0) + + l, err := le.Grant(1, minLeaseTTL) + if err != nil { + t.Fatalf("failed to grant lease (%v)", err) + } + + nle := newLessor(be, minLeaseTTL, true) + + nl := nle.Lookup(l.ID) + if !l.expiry.Equal(nl.expiry) { + t.Errorf("Time remaining was not persisted. Original: %v, New: %v", l.expiry.Round(0), nl.expiry.Round(0)) + } +} + // TestLeaseConcurrentKeys ensures Lease.Keys method calls are guarded // from concurrent map writes on 'itemSet'. func TestLeaseConcurrentKeys(t *testing.T) { @@ -102,7 +125,7 @@ func TestLeaseConcurrentKeys(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -150,7 +173,7 @@ func TestLessorRevoke(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() var fd *fakeDeleter le.SetRangeDeleter(func() TxnDelete { @@ -202,7 +225,7 @@ func TestLessorRenew(t *testing.T) { defer be.Close() defer os.RemoveAll(dir) - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() le.Promote(0) @@ -229,6 +252,37 @@ func TestLessorRenew(t *testing.T) { } } +// TestLessorRenewPersistExpiry ensures Lessor can renew an existing lease and persists the new expiration +func TestLessorRenewPersistExpiry(t *testing.T) { + dir, be := NewTestBackend(t) + defer be.Close() + defer os.RemoveAll(dir) + + le := newLessor(be, minLeaseTTL, true) + defer le.Stop() + le.Promote(0) + + l, err := le.Grant(1, minLeaseTTL) + if err != nil { + t.Fatalf("failed to grant lease (%v)", err) + } + + ttl, err := le.Renew(l.ID) + if err != nil { + t.Fatalf("failed to renew lease (%v)", err) + } + if ttl != l.ttl { + t.Errorf("ttl = %d, want %d", ttl, l.ttl) + } + + nle := newLessor(be, minLeaseTTL, true) + + nl := nle.Lookup(l.ID) + if !l.expiry.Equal(nl.expiry) { + t.Errorf("Time remaining was not persisted. Original: %v, New: %v", l.expiry.Round(0), nl.expiry.Round(0)) + } +} + // TestLessorRenewExtendPileup ensures Lessor extends leases on promotion if too many // expire at the same time. func TestLessorRenewExtendPileup(t *testing.T) { @@ -239,7 +293,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) ttl := int64(10) for i := 1; i <= leaseRevokeRate*10; i++ { if _, err := le.Grant(LeaseID(2*i), ttl); err != nil { @@ -258,7 +312,7 @@ func TestLessorRenewExtendPileup(t *testing.T) { bcfg.Path = filepath.Join(dir, "be") be = backend.New(bcfg) defer be.Close() - le = newLessor(be, minLeaseTTL) + le = newLessor(be, minLeaseTTL, false) defer le.Stop() // extend after recovery should extend expiration on lease pile-up @@ -282,12 +336,53 @@ func TestLessorRenewExtendPileup(t *testing.T) { } } + +//TestLessorPersistExpiryPromote ensures that on promotion, lease expirations are not redistributed +//regardless of revoke rate +func TestLessorPersistExpiryPromote(t *testing.T) { + oldRevokeRate := leaseRevokeRate + defer func() { leaseRevokeRate = oldRevokeRate }() + leaseRevokeRate = 10 + + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + + le := newLessor(be, minLeaseTTL, true) + ttl := int64(10) + expiryMap := make(map[LeaseID]time.Time) + for i := 1; i <= leaseRevokeRate*10; i++ { + l, err := le.Grant(LeaseID(2*i), ttl) + if err != nil { + t.Fatal(err) + } + expiryMap[l.ID] = l.expiry + } + + // simulate stop and recovery + le.Stop() + be.Close() + bcfg := backend.DefaultBackendConfig() + bcfg.Path = filepath.Join(dir, "be") + be = backend.New(bcfg) + defer be.Close() + le = newLessor(be, minLeaseTTL, true) + defer le.Stop() + + le.Promote(0) + + for _, l := range le.leaseMap { + if !expiryMap[l.ID].Equal(l.expiry) { + t.Errorf("expected expiry to not change for %v, got %v, want %v", l.ID, l.expiry, expiryMap[l.ID]) + } + } +} + func TestLessorDetach(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) defer be.Close() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() le.SetRangeDeleter(func() TxnDelete { return newFakeDeleter(be) }) @@ -327,7 +422,7 @@ func TestLessorRecover(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() l1, err1 := le.Grant(1, 10) l2, err2 := le.Grant(2, 20) @@ -336,7 +431,7 @@ func TestLessorRecover(t *testing.T) { } // Create a new lessor with the same backend - nle := newLessor(be, minLeaseTTL) + nle := newLessor(be, minLeaseTTL, false) defer nle.Stop() nl1 := nle.Lookup(l1.ID) if nl1 == nil || nl1.ttl != l1.ttl { @@ -349,6 +444,63 @@ func TestLessorRecover(t *testing.T) { } } +// TestLessorRecoverWithExpiry ensures Lessor recovers leases +// including persisted expiry from backend. +func TestLessorRecoverWithExpiry(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + le := newLessor(be, minLeaseTTL, true) + defer le.Stop() + l1, err1 := le.Grant(1, 10) + if err1 != nil { + t.Fatalf("could not grant initial lease (%v)", err1) + } + + // Create a new lessor with the same backend + nle := newLessor(be, minLeaseTTL, true) + defer nle.Stop() + nl1 := nle.Lookup(l1.ID) + if nl1 == nil || nl1.ttl != l1.ttl { + t.Errorf("nl1 = %v, want nl1.ttl= %d", nl1.ttl, l1.ttl) + } + + if nl1 == nil || !nl1.expiry.Equal(l1.expiry) { + t.Errorf("nl1.expiry = %v, want nl1.expiry= %v", nl1.expiry, l1.expiry) + } + + if nl1 == nil || nl1.expiry == forever { + t.Errorf("nl1.expiry = %v (forever)", nl1.expiry) + } +} + +func TestLessorRecoverWithExpiryPersistChanging(t *testing.T) { + dir, be := NewTestBackend(t) + defer os.RemoveAll(dir) + defer be.Close() + + le := newLessor(be, minLeaseTTL, false) + defer le.Stop() + l1, err1 := le.Grant(1, 10) + if err1 != nil { + t.Fatalf("could not grant initial lease (%v)", err1) + } + + // Create a new lessor with the same backend, but persist set to true + nle := newLessor(be, minLeaseTTL, true) + defer nle.Stop() + nl1 := nle.Lookup(l1.ID) + + if nl1 == nil || !nl1.expiry.Equal(l1.expiry) { + t.Errorf("nl1.expiry = %v, want nl1.expiry= %v", nl1.expiry, l1.expiry) + } + + if nl1 == nil || !nl1.expiry.Equal(forever) { + t.Errorf("nl1.expiry is not forever (%v), expected pre-persist leases to not have an expiry", nl1.expiry) + } +} + func TestLessorExpire(t *testing.T) { dir, be := NewTestBackend(t) defer os.RemoveAll(dir) @@ -356,7 +508,7 @@ func TestLessorExpire(t *testing.T) { testMinTTL := int64(1) - le := newLessor(be, testMinTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() le.Promote(1 * time.Second) @@ -408,7 +560,7 @@ func TestLessorExpireAndDemote(t *testing.T) { testMinTTL := int64(1) - le := newLessor(be, testMinTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() le.Promote(1 * time.Second) @@ -456,7 +608,7 @@ func TestLessorMaxTTL(t *testing.T) { defer os.RemoveAll(dir) defer be.Close() - le := newLessor(be, minLeaseTTL) + le := newLessor(be, minLeaseTTL, false) defer le.Stop() _, err := le.Grant(1, MaxLeaseTTL+1) diff --git a/snapshot/v3_snapshot.go b/snapshot/v3_snapshot.go new file mode 100644 index 00000000000..c589745ed9b --- /dev/null +++ b/snapshot/v3_snapshot.go @@ -0,0 +1,438 @@ +// Copyright 2018 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package snapshot + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "hash/crc32" + "io" + "math" + "os" + "path/filepath" + "reflect" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + "github.com/coreos/etcd/etcdserver/membership" + "github.com/coreos/etcd/etcdserver/v2store" + "github.com/coreos/etcd/lease" + "github.com/coreos/etcd/mvcc" + "github.com/coreos/etcd/mvcc/backend" + "github.com/coreos/etcd/pkg/fileutil" + "github.com/coreos/etcd/pkg/logutil" + "github.com/coreos/etcd/pkg/types" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "github.com/coreos/etcd/raftsnap" + "github.com/coreos/etcd/wal" + "github.com/coreos/etcd/wal/walpb" + + bolt "github.com/coreos/bbolt" +) + +// Manager defines snapshot methods. +type Manager interface { + // Save fetches snapshot from remote etcd server and saves data to target path. + // If the context "ctx" is canceled or timed out, snapshot save stream will error out + // (e.g. context.Canceled, context.DeadlineExceeded). + Save(ctx context.Context, dbPath string) error + + // Status returns the snapshot file information. + Status(dbPath string) (Status, error) + + // Restore restores a new etcd data directory from given snapshot file. + // It returns an error if specified data directory already exists, to + // prevent unintended data directory overwrites. + Restore(dbPath string, cfg RestoreConfig) error +} + +// Status is the snapshot file status. +type Status struct { + Hash uint32 `json:"hash"` + Revision int64 `json:"revision"` + TotalKey int `json:"totalKey"` + TotalSize int64 `json:"totalSize"` +} + +// RestoreConfig configures snapshot restore operation. +type RestoreConfig struct { + // Name is the human-readable name of this member. + Name string + // OutputDataDir is the target data directory to save restored data. + // OutputDataDir should not conflict with existing etcd data directory. + // If OutputDataDir already exists, it will return an error to prevent + // unintended data directory overwrites. + // Defaults to "[Name].etcd" if not given. + OutputDataDir string + // OutputWALDir is the target WAL data directory. + // Defaults to "[OutputDataDir]/member/wal" if not given. + OutputWALDir string + // InitialCluster is the initial cluster configuration for restore bootstrap. + InitialCluster types.URLsMap + // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap. + InitialClusterToken string + // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster. + PeerURLs types.URLs + // SkipHashCheck is "true" to ignore snapshot integrity hash value + // (required if copied from data directory). + SkipHashCheck bool +} + +// NewV3 returns a new snapshot Manager for v3.x snapshot. +// "*clientv3.Client" is only used for "Save" method. +// Otherwise, pass "nil". +func NewV3(cli *clientv3.Client, lg logutil.Logger) Manager { + if lg == nil { + lg = logutil.NewDiscardLogger() + } + return &v3Manager{cli: cli, logger: lg} +} + +type v3Manager struct { + cli *clientv3.Client + + name string + dbPath string + walDir string + snapDir string + cl *membership.RaftCluster + + skipHashCheck bool + logger logutil.Logger +} + +func (s *v3Manager) Save(ctx context.Context, dbPath string) error { + partpath := dbPath + ".part" + f, err := os.Create(partpath) + if err != nil { + os.RemoveAll(partpath) + return fmt.Errorf("could not open %s (%v)", partpath, err) + } + s.logger.Infof("created temporary db file %q", partpath) + + var rd io.ReadCloser + rd, err = s.cli.Snapshot(ctx) + if err != nil { + os.RemoveAll(partpath) + return err + } + s.logger.Infof("copying from snapshot stream") + if _, err = io.Copy(f, rd); err != nil { + os.RemoveAll(partpath) + return err + } + if err = fileutil.Fsync(f); err != nil { + os.RemoveAll(partpath) + return err + } + if err = f.Close(); err != nil { + os.RemoveAll(partpath) + return err + } + + s.logger.Infof("renaming from %q to %q", partpath, dbPath) + if err = os.Rename(partpath, dbPath); err != nil { + os.RemoveAll(partpath) + return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err) + } + return nil +} + +func (s *v3Manager) Status(dbPath string) (ds Status, err error) { + if _, err = os.Stat(dbPath); err != nil { + return ds, err + } + + db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true}) + if err != nil { + return ds, err + } + defer db.Close() + + h := crc32.New(crc32.MakeTable(crc32.Castagnoli)) + + if err = db.View(func(tx *bolt.Tx) error { + ds.TotalSize = tx.Size() + c := tx.Cursor() + for next, _ := c.First(); next != nil; next, _ = c.Next() { + b := tx.Bucket(next) + if b == nil { + return fmt.Errorf("cannot get hash of bucket %s", string(next)) + } + h.Write(next) + iskeyb := (string(next) == "key") + b.ForEach(func(k, v []byte) error { + h.Write(k) + h.Write(v) + if iskeyb { + rev := bytesToRev(k) + ds.Revision = rev.main + } + ds.TotalKey++ + return nil + }) + } + return nil + }); err != nil { + return ds, err + } + + ds.Hash = h.Sum32() + return ds, nil +} + +func (s *v3Manager) Restore(dbPath string, cfg RestoreConfig) error { + srv := etcdserver.ServerConfig{ + Name: cfg.Name, + InitialClusterToken: cfg.InitialClusterToken, + InitialPeerURLsMap: cfg.InitialCluster, + PeerURLs: cfg.PeerURLs, + } + if err := srv.VerifyBootstrap(); err != nil { + return err + } + + var err error + s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialCluster) + if err != nil { + return err + } + + dataDir := cfg.OutputDataDir + if dataDir == "" { + dataDir = cfg.Name + ".etcd" + } + if _, err = os.Stat(dataDir); err == nil { + return fmt.Errorf("data-dir %q exists", dataDir) + } + walDir := cfg.OutputWALDir + if walDir == "" { + walDir = filepath.Join(dataDir, "member", "wal") + } else if _, err = os.Stat(walDir); err == nil { + return fmt.Errorf("wal-dir %q exists", walDir) + } + s.logger.Infof("restoring snapshot file %q to data-dir %q, wal-dir %q", dbPath, dataDir, walDir) + + s.name = cfg.Name + s.dbPath = dbPath + s.walDir = walDir + s.snapDir = filepath.Join(dataDir, "member", "snap") + s.skipHashCheck = cfg.SkipHashCheck + + s.logger.Infof("writing snapshot directory %q", s.snapDir) + if err = s.saveDB(); err != nil { + return err + } + s.logger.Infof("writing WAL directory %q and raft snapshot to %q", s.walDir, s.snapDir) + err = s.saveWALAndSnap() + if err == nil { + s.logger.Infof("finished restore %q to data directory %q, wal directory %q", dbPath, dataDir, walDir) + } + return err +} + +// saveDB copies the database snapshot to the snapshot directory +func (s *v3Manager) saveDB() error { + f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600) + if ferr != nil { + return ferr + } + defer f.Close() + + // get snapshot integrity hash + if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil { + return err + } + sha := make([]byte, sha256.Size) + if _, err := f.Read(sha); err != nil { + return err + } + if _, err := f.Seek(0, io.SeekStart); err != nil { + return err + } + + if err := fileutil.CreateDirAll(s.snapDir); err != nil { + return err + } + + dbpath := filepath.Join(s.snapDir, "db") + db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600) + if dberr != nil { + return dberr + } + if _, err := io.Copy(db, f); err != nil { + return err + } + + // truncate away integrity hash, if any. + off, serr := db.Seek(0, io.SeekEnd) + if serr != nil { + return serr + } + hasHash := (off % 512) == sha256.Size + if hasHash { + if err := db.Truncate(off - sha256.Size); err != nil { + return err + } + } + + if !hasHash && !s.skipHashCheck { + return fmt.Errorf("snapshot missing hash but --skip-hash-check=false") + } + + if hasHash && !s.skipHashCheck { + // check for match + if _, err := db.Seek(0, io.SeekStart); err != nil { + return err + } + h := sha256.New() + if _, err := io.Copy(h, db); err != nil { + return err + } + dbsha := h.Sum(nil) + if !reflect.DeepEqual(sha, dbsha) { + return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha) + } + } + + // db hash is OK, can now modify DB so it can be part of a new cluster + db.Close() + + commit := len(s.cl.Members()) + + // update consistentIndex so applies go through on etcdserver despite + // having a new raft instance + be := backend.NewDefaultBackend(dbpath) + + // a lessor never timeouts leases + lessor := lease.NewLessor(be, math.MaxInt64, false) + + mvs := mvcc.NewStore(be, lessor, (*initIndex)(&commit)) + txn := mvs.Write() + btx := be.BatchTx() + del := func(k, v []byte) error { + txn.DeleteRange(k, nil) + return nil + } + + // delete stored members from old cluster since using new members + btx.UnsafeForEach([]byte("members"), del) + + // todo: add back new members when we start to deprecate old snap file. + btx.UnsafeForEach([]byte("members_removed"), del) + + // trigger write-out of new consistent index + txn.End() + + mvs.Commit() + mvs.Close() + be.Close() + + return nil +} + +// saveWALAndSnap creates a WAL for the initial cluster +func (s *v3Manager) saveWALAndSnap() error { + if err := fileutil.CreateDirAll(s.walDir); err != nil { + return err + } + + // add members again to persist them to the store we create. + st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) + s.cl.SetStore(st) + for _, m := range s.cl.Members() { + s.cl.AddMember(m) + } + + m := s.cl.MemberByName(s.name) + md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())} + metadata, merr := md.Marshal() + if merr != nil { + return merr + } + w, walerr := wal.Create(s.walDir, metadata) + if walerr != nil { + return walerr + } + defer w.Close() + + peers := make([]raft.Peer, len(s.cl.MemberIDs())) + for i, id := range s.cl.MemberIDs() { + ctx, err := json.Marshal((*s.cl).Member(id)) + if err != nil { + return err + } + peers[i] = raft.Peer{ID: uint64(id), Context: ctx} + } + + ents := make([]raftpb.Entry, len(peers)) + nodeIDs := make([]uint64, len(peers)) + for i, p := range peers { + nodeIDs[i] = p.ID + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeAddNode, + NodeID: p.ID, + Context: p.Context, + } + d, err := cc.Marshal() + if err != nil { + return err + } + ents[i] = raftpb.Entry{ + Type: raftpb.EntryConfChange, + Term: 1, + Index: uint64(i + 1), + Data: d, + } + } + + commit, term := uint64(len(ents)), uint64(1) + if err := w.Save(raftpb.HardState{ + Term: term, + Vote: peers[0].ID, + Commit: commit, + }, ents); err != nil { + return err + } + + b, berr := st.Save() + if berr != nil { + return berr + } + raftSnap := raftpb.Snapshot{ + Data: b, + Metadata: raftpb.SnapshotMetadata{ + Index: commit, + Term: term, + ConfState: raftpb.ConfState{ + Nodes: nodeIDs, + }, + }, + } + sn := raftsnap.New(s.snapDir) + if err := sn.SaveSnap(raftSnap); err != nil { + return err + } + + err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}) + if err == nil { + s.logger.Infof("wrote WAL snapshot to %q", s.walDir) + } + return err +}