Skip to content

Commit

Permalink
Merge #32164
Browse files Browse the repository at this point in the history
32164: storage/cmdq: create new signal type for cmd completion signaling r=nvanbenschoten a=nvanbenschoten

`signal` is a type that can signal the completion of an operation.
This is a component of the larger change in #31997.

The type has three benefits over using a channel directly and
closing the channel when the operation completes:
1. signaled() uses atomics to provide a fast-path for checking
   whether the operation has completed. It is ~75x faster than
   using a channel for this purpose.
2. the type's channel is lazily initialized when signalChan()
   is called, avoiding the allocation when one is not needed.
3. because of 2, the type's zero value can be used directly.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Nov 12, 2018
2 parents 7cb791c + a08d232 commit 9410ea2
Show file tree
Hide file tree
Showing 2 changed files with 297 additions and 0 deletions.
102 changes: 102 additions & 0 deletions pkg/storage/cmdq/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package cmdq

import (
"sync/atomic"
"unsafe"
)

const (
// not yet signaled.
noSig int32 = iota
// signaled and the channel was not closed.
sig
// signaled and the channel was closed.
sigClosed
)

// signal is a type that can signal the completion of an operation.
//
// The type has three benefits over using a channel directly and
// closing the channel when the operation completes:
// 1. signaled() uses atomics to provide a fast-path for checking
// whether the operation has completed. It is ~75x faster than
// using a channel for this purpose.
// 2. the receiver's channel is lazily initialized when signalChan()
// is called, avoiding the allocation when one is not needed.
// 3. because of 2, the type's zero value can be used directly.
//
type signal struct {
a int32
c unsafe.Pointer // chan struct{}, lazily initialized
}

func (s *signal) signal() {
if !atomic.CompareAndSwapInt32(&s.a, noSig, sig) {
panic("signaled twice")
}
// Close the channel if it was ever initialized.
if cPtr := atomic.LoadPointer(&s.c); cPtr != nil {
// Coordinate with signalChan to avoid double-closing.
if atomic.CompareAndSwapInt32(&s.a, sig, sigClosed) {
close(ptrToChan(cPtr))
}
}
}

func (s *signal) signaled() bool {
return atomic.LoadInt32(&s.a) > noSig
}

func (s *signal) signalChan() <-chan struct{} {
// If the signal has already been signaled, return a closed channel.
if s.signaled() {
return closedC
}

// If the signal's channel has already been lazily initialized, return it.
if cPtr := atomic.LoadPointer(&s.c); cPtr != nil {
return ptrToChan(cPtr)
}

// Lazily initialize the channel.
c := make(chan struct{})
if !atomic.CompareAndSwapPointer(&s.c, nil, chanToPtr(c)) {
// We raced with another initialization.
return ptrToChan(atomic.LoadPointer(&s.c))
}

// Coordinate with signal to close the new channel, if necessary.
if atomic.CompareAndSwapInt32(&s.a, sig, sigClosed) {
close(c)
}
return c
}

func chanToPtr(c chan struct{}) unsafe.Pointer {
return *(*unsafe.Pointer)(unsafe.Pointer(&c))
}

func ptrToChan(p unsafe.Pointer) chan struct{} {
return *(*chan struct{})(unsafe.Pointer(&p))
}

var closedC chan struct{}

func init() {
closedC = make(chan struct{})
close(closedC)
}
195 changes: 195 additions & 0 deletions pkg/storage/cmdq/signal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package cmdq

import (
"sync"
"sync/atomic"
"testing"

"github.com/stretchr/testify/require"
)

func TestSignal(t *testing.T) {
var s signal
require.False(t, s.signaled())

s.signal()
require.True(t, s.signaled())
require.Equal(t, struct{}{}, <-s.signalChan())
}

func TestSignalConcurrency(t *testing.T) {
const trials = 100
for i := 0; i < trials; i++ {
var s signal
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
<-s.signalChan()
require.True(t, s.signaled())
}()
go func() {
defer wg.Done()
require.False(t, s.signaled())
s.signal()
require.True(t, s.signaled())
}()
go func() {
defer wg.Done()
<-s.signalChan()
require.True(t, s.signaled())
}()
wg.Wait()
require.True(t, s.signaled())
}
}

func BenchmarkSignaled(b *testing.B) {
var s signal
s.signal()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = s.signaled()
}
}

func BenchmarkSignalBeforeChan(b *testing.B) {
var s signal
for i := 0; i < b.N; i++ {
s = signal{} // reset
s.signal()
}
}

func BenchmarkSignalAfterChan(b *testing.B) {
var s signal
chans := make([]chan struct{}, b.N)
for i := range chans {
chans[i] = make(chan struct{})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
s = signal{} // reset
s.c = chanToPtr(chans[i])
s.signal()
}
}

func BenchmarkInitialChanBeforeSignal(b *testing.B) {
var s signal
for i := 0; i < b.N; i++ {
s = signal{} // reset
_ = s.signalChan()
}
}

func BenchmarkSecondChanBeforeSignal(b *testing.B) {
var s signal
_ = s.signalChan()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = s.signalChan()
}
}

func BenchmarkInitialChanAfterSignal(b *testing.B) {
var s signal
s.signal()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.c = nil
_ = s.signalChan()
}
}

func BenchmarkSecondChanAfterSignal(b *testing.B) {
var s signal
s.signal()
_ = s.signalChan()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = s.signalChan()
}
}

// The following is a series of benchmarks demonstrating the value of the signal
// type and the fast-path that it provides. Closing channels to signal
// completion of a task is convenient, but in performance critical code paths it
// is essential to have a way to efficiently check for completion before falling
// back to waiting for the channel to close and entering select blocks. The
// benchmarks demonstrate that a channel on its own cannot be used to perform an
// efficient completion check, which is why the signal type mixes channels with
// atomics. The reason for this is that channels are forced to acquire an
// internal mutex before determining that they are closed and can return a zero
// value. This will always be more expensive than a single atomic load.
//
// Results with go1.10.4 on a Mac with a 3.1 GHz Intel Core i7 processor:
//
// ReadClosedChan-4 24.2ns ± 3%
// SingleSelectClosedChan-4 24.9ns ± 2%
// DefaultSelectClosedChan-4 24.6ns ± 1%
// MultiSelectClosedChan-4 97.9ns ± 2%
// Signaled-4 0.35ns ±13%
//

func BenchmarkReadClosedChan(b *testing.B) {
c := make(chan struct{})
close(c)
for i := 0; i < b.N; i++ {
<-c
}
}

func BenchmarkSingleSelectClosedChan(b *testing.B) {
c := make(chan struct{})
close(c)
//lint:ignore S1000 we don't want this simplified
for i := 0; i < b.N; i++ {
select {
case <-c:
}
}
}

func BenchmarkDefaultSelectClosedChan(b *testing.B) {
c := make(chan struct{})
close(c)
for i := 0; i < b.N; i++ {
select {
case <-c:
default:
}
}
}

func BenchmarkMultiSelectClosedChan(b *testing.B) {
c, c2 := make(chan struct{}), make(chan struct{})
close(c)
for i := 0; i < b.N; i++ {
select {
case <-c:
case <-c2:
}
}
}

func BenchmarkAtomicLoad(b *testing.B) {
a := int32(1)
for i := 0; i < b.N; i++ {
_ = atomic.LoadInt32(&a)
}
}

0 comments on commit 9410ea2

Please sign in to comment.