From aa7e863482ed302383c382affb62333ce671bc2d Mon Sep 17 00:00:00 2001 From: "Jason E. Aten" Date: Sun, 28 Aug 2016 00:58:57 -0500 Subject: [PATCH] clientv3/concurrency: allow election on prefixes of keys. After winning an election or obtaining a lock, we auto-append a slash after the provided key prefix. This avoids the previous deadlock due to waiting on the wrong key. Fixes #6278 --- clientv3/concurrency/election.go | 5 +- clientv3/concurrency/mutex.go | 4 +- integration/v3_election_test.go | 160 +++++++++++++++++++++++++++++++ 3 files changed, 164 insertions(+), 5 deletions(-) diff --git a/clientv3/concurrency/election.go b/clientv3/concurrency/election.go index f14cd55e369d..abf647aa6846 100644 --- a/clientv3/concurrency/election.go +++ b/clientv3/concurrency/election.go @@ -40,7 +40,7 @@ type Election struct { // NewElection returns a new election on a given key prefix. func NewElection(s *Session, pfx string) *Election { - return &Election{session: s, keyPrefix: pfx} + return &Election{session: s, keyPrefix: pfx + "/"} } // Campaign puts a value as eligible for the election. It blocks until @@ -49,7 +49,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error { s := e.session client := e.session.Client() - k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease()) + k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease()) txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0)) txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease()))) txn = txn.Else(v3.OpGet(k)) @@ -57,7 +57,6 @@ func (e *Election) Campaign(ctx context.Context, val string) error { if err != nil { return err } - e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s if !resp.Succeeded { kv := resp.Responses[0].GetResponseRange().Kvs[0] diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 298d7636b8b5..39010e47bd60 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -32,7 +32,7 @@ type Mutex struct { } func NewMutex(s *Session, pfx string) *Mutex { - return &Mutex{s, pfx, "", -1} + return &Mutex{s, pfx + "/", "", -1} } // Lock locks the mutex with a cancellable context. If the context is cancelled @@ -41,7 +41,7 @@ func (m *Mutex) Lock(ctx context.Context) error { s := m.s client := m.s.Client() - m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease()) + m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) // put self in lock waiters via myKey; oldest waiter holds lock put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) diff --git a/integration/v3_election_test.go b/integration/v3_election_test.go index f2b9cd04aa32..5aa7aaef59e8 100644 --- a/integration/v3_election_test.go +++ b/integration/v3_election_test.go @@ -197,3 +197,163 @@ func TestElectionSessionRecampaign(t *testing.T) { t.Fatalf("expected value=%q, got response %v", "def", resp) } } + +// TestElectionOnPrefixOfExistingKey checks that a single +// candidate can be elected on a new key that is a prefix +// of an existing key. To wit, check for regression +// of bug #6278. https://github.com/coreos/etcd/issues/6278 +// +func TestElectionOnPrefixOfExistingKey(t *testing.T) { + keyPrefix := "test" + fullKey := keyPrefix + "election" + arbitraryValue := "whatever" + + clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + defer clus.Terminate(t) + + var clients []*clientv3.Client + newClient := makeMultiNodeClients(t, clus.cluster, &clients) + + // setup test: create fullKey: a key that extends our prefix + cli := newClient() + err := putCliKeyValHelper(cli, fullKey, arbitraryValue) + if err != nil { + t.Fatalf("could not set initial key during test setup: '%s'", err) + } + + chk, err := getCliKeyHelper(cli, fullKey) + if err != nil { + t.Fatalf("could not get initial key during test setup: '%s'", err) + } + if chk != arbitraryValue { + t.Fatalf("could not set fullKey '%s' to value '%s'; instead chk = '%s'", fullKey, arbitraryValue, chk) + } + // okay, setup done, fullKey is set. + fmt.Printf("Test setup done.\n") + // now run a single candidate election on keyPrefix + + electedc := make(chan string) + + // waitTime is how long we wait before declaring + // a regression / the bug has come back. + // Under the correct/fast-path we never wait this long. + waitTime := 5000 * time.Millisecond + + followerDone := make(chan struct{}) + leaderDone := make(chan struct{}) + + // because t.Fatalf must be called on original goroutine, use failMsg. + failMsg := make(chan string) + + // observe the election + go func() { + defer close(followerDone) + session, err := concurrency.NewSession(newClient()) + if err != nil { + failMsg <- fmt.Sprintf("%v", err) + return + } + b := concurrency.NewElection(session, keyPrefix) + + cctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + var s clientv3.GetResponse + select { + case s = <-b.Observe(cctx): + // good, election actually happened. + v2 := s.Kvs[0].Value + + // if v2 is 'whatever' then we are in trouble! + // leader will fail out in this case. + fmt.Printf("Observer of election observed: '%s'.\n", v2) + + select { + case electedc <- string(v2): + // good, on success path so far. + return + case <-leaderDone: + // leader bailed early, so should we. + return + case <-time.After(waitTime): + fmt.Printf("could not observe election after wait time %v\n", waitTime) + // expected failure here without the bug fix + failMsg <- fmt.Sprintf("could not observe election after wait time %v", waitTime) + return + } + + case <-time.After(waitTime): + fmt.Printf("could not observe election after wait time %v\n", waitTime) + // expected failure here without the bug fix + failMsg <- fmt.Sprintf("could not observe election after wait time %v", waitTime) + return + } + }() + + // run the election on keyPrefix + go func() { + defer close(leaderDone) + session, err := concurrency.NewSession(newClient()) + if err != nil { + failMsg <- fmt.Sprintf("%v", err) + return + } + + e := concurrency.NewElection(session, keyPrefix) + ev := fmt.Sprintf("electval-%v", time.Now().UnixNano()) + if err := e.Campaign(context.TODO(), ev); err != nil { + // expected failure here without the bug fix + failMsg <- fmt.Sprintf("failed to elect our lone candidate: %v", err) + return + } + // wait for follower to observe election + fmt.Printf("Campaign completed without error.\n") + // and verify that the observer saw the ev value we promulgated. + var s string + select { + case s = <-electedc: + case <-time.After(waitTime): + failMsg <- fmt.Sprintf("election hung, no election with lone candiate after waitTime %v", waitTime) + return + } + if s != ev { + failMsg <- fmt.Sprintf("wrong election value got '%s', wanted '%s'", s, ev) + return + } else { + fmt.Printf("Good, election observed what we expected: '%s'.\n", ev) + } + }() + + // wait for both to finish + var seenFollowerFinish, seenLeaderFinish bool + for !(seenFollowerFinish && seenLeaderFinish) { + select { + case <-followerDone: + seenFollowerFinish = true + followerDone = nil + case <-leaderDone: + seenLeaderFinish = true + leaderDone = nil + case failure := <-failMsg: + t.Fatalf("%s", failure) + } + } + + closeClients(t, clients) +} + +func putCliKeyValHelper(client *clientv3.Client, key string, val string) error { + opts := []clientv3.OpOption{} + ctx := context.Background() + _, err := client.Put(ctx, key, val, opts...) + return err +} + +func getCliKeyHelper(client *clientv3.Client, key string) (val string, err error) { + opts := []clientv3.OpOption{} + ctx := context.Background() + resp, err := client.Get(ctx, key, opts...) + if err != nil { + return "", err + } + return string(resp.Kvs[0].Value), nil +}