Skip to content

Commit

Permalink
Merge pull request #8356 from heyitsanthony/election-example
Browse files Browse the repository at this point in the history
concurrency: add examples
  • Loading branch information
Anthony Romano authored Aug 3, 2017
2 parents 033c0cb + b7b31e5 commit 107c18f
Show file tree
Hide file tree
Showing 4 changed files with 306 additions and 0 deletions.
90 changes: 90 additions & 0 deletions clientv3/concurrency/example_election_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrency_test

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
)

func ExampleElection_Campaign() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

// create two separate sessions for election competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
e1 := concurrency.NewElection(s1, "/my-election/")

s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
e2 := concurrency.NewElection(s2, "/my-election/")

// create competing candidates, with e1 initially losing to e2
var wg sync.WaitGroup
wg.Add(2)
electc := make(chan *concurrency.Election, 2)
go func() {
defer wg.Done()
// delay candidacy so e2 wins first
time.Sleep(3 * time.Second)
if err := e1.Campaign(context.Background(), "e1"); err != nil {
log.Fatal(err)
}
electc <- e1
}()
go func() {
defer wg.Done()
if err := e2.Campaign(context.Background(), "e2"); err != nil {
log.Fatal(err)
}
electc <- e2
}()

cctx, cancel := context.WithCancel(context.TODO())
defer cancel()

e := <-electc
fmt.Println("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))

// resign so next candidate can be elected
if err := e.Resign(context.TODO()); err != nil {
log.Fatal(err)
}

e = <-electc
fmt.Println("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))

wg.Wait()

// Output:
// completed first election with e2
// completed second election with e1
}
75 changes: 75 additions & 0 deletions clientv3/concurrency/example_mutex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrency_test

import (
"context"
"fmt"
"log"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
)

func ExampleMutex_Lock() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

// create two separate sessions for lock competition
s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")

s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m2 := concurrency.NewMutex(s2, "/my-lock/")

// acquire lock for s1
if err := m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")

m2Locked := make(chan struct{})
go func() {
defer close(m2Locked)
// wait until s1 is locks /my-lock/
if err := m2.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
}()

if err := m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")

<-m2Locked
fmt.Println("acquired lock for s2")

// Output:
// acquired lock for s1
// released lock for s1
// acquired lock for s2
}
97 changes: 97 additions & 0 deletions clientv3/concurrency/example_stm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrency_test

import (
"context"
"fmt"
"log"
"math/rand"
"sync"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
)

// ExampleSTM_apply shows how to use STM with a transactional
// transfer between balances.
func ExampleSTM_apply() {
cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
if err != nil {
log.Fatal(err)
}
defer cli.Close()

// set up "accounts"
totalAccounts := 5
for i := 0; i < totalAccounts; i++ {
k := fmt.Sprintf("accts/%d", i)
if _, err = cli.Put(context.TODO(), k, "100"); err != nil {
log.Fatal(err)
}
}

exchange := func(stm concurrency.STM) error {
from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
if from == to {
// nothing to do
return nil
}
// read values
fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
fromV, toV := stm.Get(fromK), stm.Get(toK)
fromInt, toInt := 0, 0
fmt.Sscanf(fromV, "%d", &fromInt)
fmt.Sscanf(toV, "%d", &toInt)

// transfer amount
xfer := fromInt / 2
fromInt, toInt = fromInt-xfer, toInt-xfer

// writeback
stm.Put(fromK, fmt.Sprintf("%d", fromInt))
stm.Put(toK, fmt.Sprintf("%d", toInt))
return nil
}

// concurrently exchange values between accounts
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
if _, serr := concurrency.NewSTM(cli, exchange); serr != nil {
log.Fatal(serr)
}
}()
}
wg.Wait()

// confirm account sum matches sum from beginning.
sum := 0
accts, err := cli.Get(context.TODO(), "accts/", clientv3.WithPrefix())
if err != nil {
log.Fatal(err)
}
for _, kv := range accts.Kvs {
v := 0
fmt.Sscanf(string(kv.Value), "%d", &v)
sum += v
}

fmt.Println("account sum is", sum)
// Output:
// account sum is 500
}
44 changes: 44 additions & 0 deletions clientv3/concurrency/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package concurrency_test

import (
"fmt"
"os"
"testing"
"time"

"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

var endpoints []string

// TestMain sets up an etcd cluster for running the examples.
func TestMain(m *testing.M) {
cfg := integration.ClusterConfig{Size: 1}
clus := integration.NewClusterV3(nil, &cfg)
endpoints = []string{clus.Client(0).Endpoints()[0]}
v := m.Run()
clus.Terminate(nil)
if err := testutil.CheckAfterTest(time.Second); err != nil {
fmt.Fprintf(os.Stderr, "%v", err)
os.Exit(1)
}
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}

0 comments on commit 107c18f

Please sign in to comment.