Skip to content

Commit

Permalink
leasing: initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
visheshnp committed May 18, 2017
1 parent aac2292 commit 6b97b49
Show file tree
Hide file tree
Showing 3 changed files with 354 additions and 0 deletions.
119 changes: 119 additions & 0 deletions clientv3/integration/leasing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// // http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"testing"

leasing "github.com/coreos/etcd/clientv3/leasing"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

func TestLeasingGet(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)

c1 := clus.Client(0)
c2 := clus.Client(1)
c3 := clus.Client(2)
lKV1, err := leasing.NewleasingKV(c1, "foo/")
lKV2, err := leasing.NewleasingKV(c2, "foo/")
lKV3, err := leasing.NewleasingKV(c3, "foo/")

if err != nil {
t.Fatal(err)
}

/*if _, err := lKV1.Put(context.TODO(), "abc", "bar"); err != nil {
t.Fatal(err)
}*/

resp1, err := lKV1.Get(context.TODO(), "abc")
if err != nil {
t.Fatal(err)
}

//clus.Members[0].InjectPartition(t, clus.Members[1:])

if _, err := lKV2.Put(context.TODO(), "abc", "def"); err != nil {
t.Fatal(err)
}

resp1, err = lKV1.Get(context.TODO(), "abc")
if err != nil {
t.Fatal(err)
}

resp2, err := lKV2.Get(context.TODO(), "abc")
if err != nil {
t.Fatal(err)
}

resp3, err := lKV3.Get(context.TODO(), "abc")
if err != nil {
t.Fatal(err)
}

if _, err := lKV2.Put(context.TODO(), "abc", "ghi"); err != nil {
t.Fatal(err)
}

resp3, err = lKV3.Get(context.TODO(), "abc")
if err != nil {
t.Fatal(err)
}

if string(resp1.Kvs[0].Key) != "abc" {
t.Errorf("expected key=%q, got key=%q", "abc", resp1.Kvs[0].Key)
}

if string(resp1.Kvs[0].Value) != "def" {
t.Errorf("expected value=%q, got value=%q", "bar", resp1.Kvs[0].Value)
}

if string(resp2.Kvs[0].Key) != "abc" {
t.Errorf("expected key=%q, got key=%q", "abc", resp2.Kvs[0].Key)
}

if string(resp2.Kvs[0].Value) != "def" {
t.Errorf("expected value=%q, got value=%q", "bar", resp2.Kvs[0].Value)
}

if string(resp3.Kvs[0].Key) != "abc" {
t.Errorf("expected key=%q, got key=%q", "abc", resp3.Kvs[0].Key)
}

if string(resp3.Kvs[0].Value) != "ghi" {
t.Errorf("expected value=%q, got value=%q", "bar", resp3.Kvs[0].Value)
}
}

func TestLeasingGet2(t *testing.T) {
defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

c := clus.Client(0)
lKV, err := leasing.NewleasingKV(c, "foo/")

_, err = lKV.Get(context.TODO(), "abc")
if err != nil {
t.Fatal(err)
}
}
43 changes: 43 additions & 0 deletions clientv3/leasing/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package namespace is a clientv3 wrapper that translates all keys to begin
// with a given prefix.
//
// First, create a client:
//
// cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"localhost:2379"}})
// if err != nil {
// // handle error!
// }
//
// Next, override the client interfaces:
//
// unprefixedKV := cli.KV
// cli.KV = namespace.NewKV(cli.KV, "my-prefix/")
// cli.Watcher = namespace.NewWatcher(cli.Watcher, "my-prefix/")
// cli.Lease = namespace.NewLease(cli.Lease, "my-prefix/")
//
// Now calls using 'cli' will namespace / prefix all keys with "my-prefix/":
//
// cli.Put(context.TODO(), "abc", "123")
// resp, _ := unprefixedKV.Get(context.TODO(), "my-prefix/abc")
// fmt.Printf("%s\n", resp.Kvs[0].Value)
// // Output: 123
// unprefixedKV.Put(context.TODO(), "my-prefix/abc", "456")
// resp, _ = cli.Get("abc")
// fmt.Printf("%s\n", resp.Kvs[0].Value)
// // Output: 456
//
package leasing
192 changes: 192 additions & 0 deletions clientv3/leasing/kv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package leasing

import (
"fmt"

"golang.org/x/net/context"

"sync"

v3 "github.com/coreos/etcd/clientv3"
concurrency "github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)

type leasingKV struct {
cl *v3.Client
pfx string
session *concurrency.Session
leaseInfomap map[string]leaseInfo
ctx context.Context
cancel context.CancelFunc
mu *sync.Mutex
}

type leaseInfo struct {
resp *v3.GetResponse
revision int64
}

// NewleasingKV wraps a KV instance so that all requests are wired through a leasing protocol.
func NewleasingKV(cl *v3.Client, leasingprefix string) (v3.KV, error) {
s, err := concurrency.NewSession(cl)

if err != nil {
return nil, err
}
cctx, cancel := context.WithCancel(cl.Ctx())
return &leasingKV{cl: cl, pfx: leasingprefix, session: s, leaseInfomap: make(map[string]leaseInfo), ctx: cctx, cancel: cancel, mu: new(sync.Mutex)}, nil
}

func (lkv *leasingKV) Compact(ctx context.Context, rev int64, opts ...v3.CompactOption) (*v3.CompactResponse, error) {
return lkv.cl.Compact(ctx, rev, opts...)
}

func (lkv *leasingKV) Delete(ctx context.Context, key string, opts ...v3.OpOption) (*v3.DeleteResponse, error) {
panic("Stub")
}

func (lkv *leasingKV) Do(ctx context.Context, op v3.Op) (v3.OpResponse, error) {
panic("Stub")
}

func (lkv *leasingKV) Txn(ctx context.Context) v3.Txn {
panic("Stub")
}

func (lkv *leasingKV) Put(ctx context.Context, key, val string, opts ...v3.OpOption) (*v3.PutResponse, error) {

for ctx.Err() == nil {
//if already exist in map, then update key
if _, ok := lkv.leaseInfomap[lkv.pfx+key]; ok {

txnUpd := lkv.cl.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", lkv.leaseInfomap[lkv.pfx+key].revision))
txnUpd = txnUpd.Then(v3.OpPut(key, val))
respUpd, errUpd := txnUpd.Commit()

if errUpd != nil {
panic("Error in transaction")
}

if respUpd.Succeeded {
lkv.leaseInfomap[lkv.pfx+key].resp.Kvs[0].Value = []byte(val)
}

} else {

txn := lkv.cl.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0))
txn = txn.Then(v3.OpPut(key, val))
txn = txn.Else(v3.OpPut(lkv.pfx+key, "REVOKE", v3.WithIgnoreLease()))
resp, err := txn.Commit()

if err != nil {
panic("Error in transaction")
}

if resp.Succeeded {
resput := resp.Responses[0].GetResponsePut()
response := (*v3.PutResponse)(resput)
return response, nil
}

cctx, cancel := context.WithCancel(ctx)
wch := lkv.cl.Watch(cctx, lkv.pfx+key, v3.WithRev(resp.Header.Revision+1))
fmt.Println("Entering loop")
for resp := range wch {
fmt.Printf("%+v\n", resp)
for _, ev := range resp.Events {
if ev.Type == v3.EventTypeDelete {
cancel()
break
}
}
}
}

}
return nil, ctx.Err()
}

func (lkv *leasingKV) Get(ctx context.Context, key string, opts ...v3.OpOption) (*v3.GetResponse, error) {

if len(key) == 0 {
return nil, rpctypes.ErrEmptyKey
}

//return cached value
if li, ok := lkv.leaseInfomap[lkv.pfx+key]; ok {
return li.resp, nil
}

txn := lkv.cl.Txn(ctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", 0))
txn = txn.Then(v3.OpGet(key), v3.OpPut(lkv.pfx+key, "", v3.WithLease(lkv.session.Lease())))
txn = txn.Else(v3.OpGet(key))
resp, err := txn.Commit()

if err != nil {
panic("Error in transaction")
}

if resp.Succeeded {
lkv.mu.Lock()
lkv.leaseInfomap[lkv.pfx+key] = leaseInfo{resp: (*v3.GetResponse)(resp.Responses[0].GetResponseRange())}
defer lkv.mu.Unlock()
rev := resp.Header.Revision

go func() {
nctx, cancel := context.WithCancel(lkv.ctx)
defer cancel()
wch := lkv.cl.Watch(nctx, lkv.pfx+key, v3.WithRev(resp.Header.Revision+1))
fmt.Println("Entering loop in Get()")

for resp := range wch {
fmt.Printf("%+v\n", resp)
for _, ev := range resp.Events {
fmt.Printf("%+v\n", ev)
if string(ev.Kv.Value) == "REVOKE" { //if val is entered as revoke
txn := lkv.cl.Txn(nctx).If(v3.Compare(v3.CreateRevision(lkv.pfx+key), "=", rev))
txn = txn.Then(v3.OpDelete(lkv.pfx + key))
delResp, err := txn.Commit()

if err != nil {
//panic("Error")

}

if delResp.Succeeded {
fmt.Println("Delete success")
lkv.mu.Lock()
delete(lkv.leaseInfomap, lkv.pfx+key) //delete from map as well
defer lkv.mu.Unlock()
return
}

//if delResp.!Succeeded {

//}

}
}

}
}()
}

resprange := resp.Responses[0].GetResponseRange()
response := (*v3.GetResponse)(resprange)
return response, nil
}

0 comments on commit 6b97b49

Please sign in to comment.