diff --git a/clientv3/concurrency/example_mutex_test.go b/clientv3/concurrency/example_mutex_test.go index a0463d5b57f2..6b55340cf9bb 100644 --- a/clientv3/concurrency/example_mutex_test.go +++ b/clientv3/concurrency/example_mutex_test.go @@ -23,6 +23,57 @@ import ( "go.etcd.io/etcd/clientv3/concurrency" ) +func ExampleMutex_TryLock() { + cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) + if err != nil { + log.Fatal(err) + } + defer cli.Close() + + // create two separate sessions for lock competition + s1, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s1.Close() + m1 := concurrency.NewMutex(s1, "/my-lock/") + + s2, err := concurrency.NewSession(cli) + if err != nil { + log.Fatal(err) + } + defer s2.Close() + m2 := concurrency.NewMutex(s2, "/my-lock/") + + // acquire lock for s1 + if err = m1.Lock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("acquired lock for s1") + + if err = m2.TryLock(context.TODO()); err == nil { + log.Fatal("should not acquire lock") + } + if err == concurrency.ErrLocked { + fmt.Println("cannot acquire lock for s2, as already locked in another session") + } + + if err = m1.Unlock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("released lock for s1") + if err = m2.TryLock(context.TODO()); err != nil { + log.Fatal(err) + } + fmt.Println("acquired lock for s2") + + // Output: + // acquired lock for s1 + // cannot acquire lock for s2, as already locked in another session + // released lock for s1 + // acquired lock for s2 +} + func ExampleMutex_Lock() { cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints}) if err != nil { diff --git a/clientv3/concurrency/mutex.go b/clientv3/concurrency/mutex.go index 013534193ea5..f84f4c9d07d1 100644 --- a/clientv3/concurrency/mutex.go +++ b/clientv3/concurrency/mutex.go @@ -16,6 +16,7 @@ package concurrency import ( "context" + "errors" "fmt" "sync" @@ -23,6 +24,9 @@ import ( pb "go.etcd.io/etcd/etcdserver/etcdserverpb" ) +// ErrLocked is returned by TryLock when Mutex is already locked by another session. +var ErrLocked = errors.New("Mutex: Locked by another session") + // Mutex implements the sync Locker interface with etcd type Mutex struct { s *Session @@ -37,35 +41,43 @@ func NewMutex(s *Session, pfx string) *Mutex { return &Mutex{s, pfx + "/", "", -1, nil} } +// TryLock locks the mutex if not already locked by another session. +// If lock is held by another session, return immediately after attempting necessary cleanup +func (m *Mutex) TryLock(ctx context.Context) error { + resp, err := m.tryAcquire(ctx) + if err != nil { + return err + } + // if no key on prefix / the minimum rev is key, already hold the lock + ownerKey := resp.Responses[1].GetResponseRange().Kvs + if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { + m.hdr = resp.Header + return nil + } + client := m.s.Client() + // Cannot lock, so delete the key + if _, err := client.Delete(ctx, m.myKey); err != nil { + return err + } + m.myKey = "\x00" + m.myRev = -1 + return ErrLocked +} + // Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. func (m *Mutex) Lock(ctx context.Context) error { - s := m.s - client := m.s.Client() - - m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) - cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) - // put self in lock waiters via myKey; oldest waiter holds lock - put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) - // reuse key in case this session already holds the lock - get := v3.OpGet(m.myKey) - // fetch current holder to complete uncontended path with only one RPC - getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) - resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() + resp, err := m.tryAcquire(ctx) if err != nil { return err } - m.myRev = resp.Header.Revision - if !resp.Succeeded { - m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision - } // if no key on prefix / the minimum rev is key, already hold the lock ownerKey := resp.Responses[1].GetResponseRange().Kvs if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header return nil } - + client := m.s.Client() // wait for deletion revisions prior to myKey hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1) // release lock key if wait failed @@ -77,6 +89,29 @@ func (m *Mutex) Lock(ctx context.Context) error { return werr } +func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) { + s := m.s + client := m.s.Client() + + m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease()) + cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0) + // put self in lock waiters via myKey; oldest waiter holds lock + put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease())) + // reuse key in case this session already holds the lock + get := v3.OpGet(m.myKey) + // fetch current holder to complete uncontended path with only one RPC + getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...) + resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit() + if err != nil { + return nil, err + } + m.myRev = resp.Header.Revision + if !resp.Succeeded { + m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision + } + return resp, nil +} + func (m *Mutex) Unlock(ctx context.Context) error { client := m.s.Client() if _, err := client.Delete(ctx, m.myKey); err != nil {