Skip to content

Commit

Permalink
clientv3/concurrency: allow election on prefixes of keys.
Browse files Browse the repository at this point in the history
After winning an election or obtaining a lock, we
auto-append a slash after the provided key prefix.
This avoids the previous deadlock due to waiting
on the wrong key.

Fixes etcd-io#6278
  • Loading branch information
glycerine committed Aug 29, 2016
1 parent c388b2f commit aa7e863
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 5 deletions.
5 changes: 2 additions & 3 deletions clientv3/concurrency/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Election struct {

// NewElection returns a new election on a given key prefix.
func NewElection(s *Session, pfx string) *Election {
return &Election{session: s, keyPrefix: pfx}
return &Election{session: s, keyPrefix: pfx + "/"}
}

// Campaign puts a value as eligible for the election. It blocks until
Expand All @@ -49,15 +49,14 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
s := e.session
client := e.session.Client()

k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
txn = txn.Else(v3.OpGet(k))
resp, err := txn.Commit()
if err != nil {
return err
}

e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
if !resp.Succeeded {
kv := resp.Responses[0].GetResponseRange().Kvs[0]
Expand Down
4 changes: 2 additions & 2 deletions clientv3/concurrency/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type Mutex struct {
}

func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx, "", -1}
return &Mutex{s, pfx + "/", "", -1}
}

// Lock locks the mutex with a cancellable context. If the context is cancelled
Expand All @@ -41,7 +41,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()

m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
Expand Down
160 changes: 160 additions & 0 deletions integration/v3_election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,163 @@ func TestElectionSessionRecampaign(t *testing.T) {
t.Fatalf("expected value=%q, got response %v", "def", resp)
}
}

// TestElectionOnPrefixOfExistingKey checks that a single
// candidate can be elected on a new key that is a prefix
// of an existing key. To wit, check for regression
// of bug #6278. https://github.com/coreos/etcd/issues/6278
//
func TestElectionOnPrefixOfExistingKey(t *testing.T) {
keyPrefix := "test"
fullKey := keyPrefix + "election"
arbitraryValue := "whatever"

clus := NewClusterV3(t, &ClusterConfig{Size: 3})
defer clus.Terminate(t)

var clients []*clientv3.Client
newClient := makeMultiNodeClients(t, clus.cluster, &clients)

// setup test: create fullKey: a key that extends our prefix
cli := newClient()
err := putCliKeyValHelper(cli, fullKey, arbitraryValue)
if err != nil {
t.Fatalf("could not set initial key during test setup: '%s'", err)
}

chk, err := getCliKeyHelper(cli, fullKey)
if err != nil {
t.Fatalf("could not get initial key during test setup: '%s'", err)
}
if chk != arbitraryValue {
t.Fatalf("could not set fullKey '%s' to value '%s'; instead chk = '%s'", fullKey, arbitraryValue, chk)
}
// okay, setup done, fullKey is set.
fmt.Printf("Test setup done.\n")
// now run a single candidate election on keyPrefix

electedc := make(chan string)

// waitTime is how long we wait before declaring
// a regression / the bug has come back.
// Under the correct/fast-path we never wait this long.
waitTime := 5000 * time.Millisecond

followerDone := make(chan struct{})
leaderDone := make(chan struct{})

// because t.Fatalf must be called on original goroutine, use failMsg.
failMsg := make(chan string)

// observe the election
go func() {
defer close(followerDone)
session, err := concurrency.NewSession(newClient())
if err != nil {
failMsg <- fmt.Sprintf("%v", err)
return
}
b := concurrency.NewElection(session, keyPrefix)

cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
var s clientv3.GetResponse
select {
case s = <-b.Observe(cctx):
// good, election actually happened.
v2 := s.Kvs[0].Value

// if v2 is 'whatever' then we are in trouble!
// leader will fail out in this case.
fmt.Printf("Observer of election observed: '%s'.\n", v2)

select {
case electedc <- string(v2):
// good, on success path so far.
return
case <-leaderDone:
// leader bailed early, so should we.
return
case <-time.After(waitTime):
fmt.Printf("could not observe election after wait time %v\n", waitTime)
// expected failure here without the bug fix
failMsg <- fmt.Sprintf("could not observe election after wait time %v", waitTime)
return
}

case <-time.After(waitTime):
fmt.Printf("could not observe election after wait time %v\n", waitTime)
// expected failure here without the bug fix
failMsg <- fmt.Sprintf("could not observe election after wait time %v", waitTime)
return
}
}()

// run the election on keyPrefix
go func() {
defer close(leaderDone)
session, err := concurrency.NewSession(newClient())
if err != nil {
failMsg <- fmt.Sprintf("%v", err)
return
}

e := concurrency.NewElection(session, keyPrefix)
ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
if err := e.Campaign(context.TODO(), ev); err != nil {
// expected failure here without the bug fix
failMsg <- fmt.Sprintf("failed to elect our lone candidate: %v", err)
return
}
// wait for follower to observe election
fmt.Printf("Campaign completed without error.\n")
// and verify that the observer saw the ev value we promulgated.
var s string
select {
case s = <-electedc:
case <-time.After(waitTime):
failMsg <- fmt.Sprintf("election hung, no election with lone candiate after waitTime %v", waitTime)
return
}
if s != ev {
failMsg <- fmt.Sprintf("wrong election value got '%s', wanted '%s'", s, ev)
return
} else {
fmt.Printf("Good, election observed what we expected: '%s'.\n", ev)
}
}()

// wait for both to finish
var seenFollowerFinish, seenLeaderFinish bool
for !(seenFollowerFinish && seenLeaderFinish) {
select {
case <-followerDone:
seenFollowerFinish = true
followerDone = nil
case <-leaderDone:
seenLeaderFinish = true
leaderDone = nil
case failure := <-failMsg:
t.Fatalf("%s", failure)
}
}

closeClients(t, clients)
}

func putCliKeyValHelper(client *clientv3.Client, key string, val string) error {
opts := []clientv3.OpOption{}
ctx := context.Background()
_, err := client.Put(ctx, key, val, opts...)
return err
}

func getCliKeyHelper(client *clientv3.Client, key string) (val string, err error) {
opts := []clientv3.OpOption{}
ctx := context.Background()
resp, err := client.Get(ctx, key, opts...)
if err != nil {
return "", err
}
return string(resp.Kvs[0].Value), nil
}

0 comments on commit aa7e863

Please sign in to comment.