Skip to content

Commit

Permalink
Merge pull request #17625 from petermattis/pmattis/int-map
Browse files Browse the repository at this point in the history
storage: switch Store.{replicas,replicaQueues} from sync.Map to IntMap
  • Loading branch information
petermattis authored Aug 14, 2017
2 parents c2b0ccb + 1c945bd commit cc645f3
Show file tree
Hide file tree
Showing 8 changed files with 1,021 additions and 83 deletions.
54 changes: 27 additions & 27 deletions pkg/storage/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (
"sort"
"sync/atomic"
"time"
"unsafe"

"github.com/coreos/etcd/raft/raftpb"
"github.com/rubyist/circuitbreaker"
"golang.org/x/net/context"
"golang.org/x/sync/syncmap"
"google.golang.org/grpc"

"github.com/cockroachdb/cockroach/pkg/gossip"
Expand Down Expand Up @@ -156,10 +156,10 @@ type RaftTransport struct {
resolver NodeAddressResolver
rpcContext *rpc.Context

queues syncmap.Map // map[roachpb.NodeID]chan *RaftMessageRequest
stats syncmap.Map // map[roachpb.NodeID]*raftTransportStats
breakers syncmap.Map // map[roachpb.NodeID]*circuit.Breaker
handlers syncmap.Map // map[roachpb.StoreID]RaftMessageHandler
queues syncutil.IntMap // map[roachpb.NodeID]*chan *RaftMessageRequest
stats syncutil.IntMap // map[roachpb.NodeID]*raftTransportStats
breakers syncutil.IntMap // map[roachpb.NodeID]*circuit.Breaker
handlers syncutil.IntMap // map[roachpb.StoreID]*RaftMessageHandler
}

// NewDummyRaftTransport returns a dummy raft transport for use in tests which
Expand Down Expand Up @@ -200,18 +200,18 @@ func NewRaftTransport(
select {
case <-ticker.C:
stats = stats[:0]
t.stats.Range(func(k, v interface{}) bool {
s := v.(*raftTransportStats)
t.stats.Range(func(k int64, v unsafe.Pointer) bool {
s := (*raftTransportStats)(v)
// Clear the queue length stat. Note that this field is only
// mutated by this goroutine.
s.queue = 0
stats = append(stats, s)
return true
})

t.queues.Range(func(k, v interface{}) bool {
ch := v.(chan *RaftMessageRequest)
t.getStats(k.(roachpb.NodeID)).queue += len(ch)
t.queues.Range(func(k int64, v unsafe.Pointer) bool {
ch := *(*chan *RaftMessageRequest)(v)
t.getStats((roachpb.NodeID)(k)).queue += len(ch)
return true
})

Expand Down Expand Up @@ -258,17 +258,17 @@ func NewRaftTransport(

func (t *RaftTransport) queuedMessageCount() int64 {
var n int64
t.queues.Range(func(k, v interface{}) bool {
ch := v.(chan *RaftMessageRequest)
t.queues.Range(func(k int64, v unsafe.Pointer) bool {
ch := *(*chan *RaftMessageRequest)(v)
n += int64(len(ch))
return true
})
return n
}

func (t *RaftTransport) getHandler(storeID roachpb.StoreID) (RaftMessageHandler, bool) {
if value, ok := t.handlers.Load(storeID); ok {
return value.(RaftMessageHandler), true
if value, ok := t.handlers.Load(int64(storeID)); ok {
return *(*RaftMessageHandler)(value), true
}
return nil, false
}
Expand Down Expand Up @@ -303,12 +303,12 @@ func newRaftMessageResponse(req *RaftMessageRequest, pErr *roachpb.Error) *RaftM
}

func (t *RaftTransport) getStats(nodeID roachpb.NodeID) *raftTransportStats {
value, ok := t.stats.Load(nodeID)
value, ok := t.stats.Load(int64(nodeID))
if !ok {
stats := &raftTransportStats{nodeID: nodeID}
value, _ = t.stats.LoadOrStore(nodeID, stats)
value, _ = t.stats.LoadOrStore(int64(nodeID), unsafe.Pointer(stats))
}
return value.(*raftTransportStats)
return (*raftTransportStats)(value)
}

// RaftMessageBatch proxies the incoming requests to the listening server interface.
Expand Down Expand Up @@ -399,23 +399,23 @@ func (t *RaftTransport) RaftSnapshot(stream MultiRaft_RaftSnapshotServer) error

// Listen registers a raftMessageHandler to receive proxied messages.
func (t *RaftTransport) Listen(storeID roachpb.StoreID, handler RaftMessageHandler) {
t.handlers.Store(storeID, handler)
t.handlers.Store(int64(storeID), unsafe.Pointer(&handler))
}

// Stop unregisters a raftMessageHandler.
func (t *RaftTransport) Stop(storeID roachpb.StoreID) {
t.handlers.Delete(storeID)
t.handlers.Delete(int64(storeID))
}

// GetCircuitBreaker returns the circuit breaker controlling
// connection attempts to the specified node.
func (t *RaftTransport) GetCircuitBreaker(nodeID roachpb.NodeID) *circuit.Breaker {
value, ok := t.breakers.Load(nodeID)
value, ok := t.breakers.Load(int64(nodeID))
if !ok {
breaker := t.rpcContext.NewBreaker()
value, _ = t.breakers.LoadOrStore(nodeID, breaker)
value, _ = t.breakers.LoadOrStore(int64(nodeID), unsafe.Pointer(breaker))
}
return value.(*circuit.Breaker)
return (*circuit.Breaker)(value)
}

// connectAndProcess connects to the node and then processes the
Expand Down Expand Up @@ -548,12 +548,12 @@ func (t *RaftTransport) processQueue(
// getQueue returns the queue for the specified node ID and a boolean
// indicating whether the queue already exists (true) or was created (false).
func (t *RaftTransport) getQueue(nodeID roachpb.NodeID) (chan *RaftMessageRequest, bool) {
value, ok := t.queues.Load(nodeID)
value, ok := t.queues.Load(int64(nodeID))
if !ok {
ch := make(chan *RaftMessageRequest, raftSendBufferSize)
value, ok = t.queues.LoadOrStore(nodeID, ch)
value, ok = t.queues.LoadOrStore(int64(nodeID), unsafe.Pointer(&ch))
}
return value.(chan *RaftMessageRequest), ok
return *(*chan *RaftMessageRequest)(value), ok
}

// SendAsync sends a message to the recipient specified in the request. It
Expand Down Expand Up @@ -586,10 +586,10 @@ func (t *RaftTransport) SendAsync(req *RaftMessageRequest) bool {
func(ctx context.Context) {
t.rpcContext.Stopper.RunWorker(ctx, func(ctx context.Context) {
t.connectAndProcess(ctx, toNodeID, ch, stats)
t.queues.Delete(toNodeID)
t.queues.Delete(int64(toNodeID))
})
}); err != nil {
t.queues.Delete(toNodeID)
t.queues.Delete(int64(toNodeID))
return false
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4341,9 +4341,9 @@ func (r *Replica) acquireSplitLock(
rightRng.mu.destroyed = errors.Errorf("%s: failed to initialize", rightRng)
rightRng.mu.Unlock()
r.store.mu.Lock()
r.store.mu.replicas.Delete(rightRng.RangeID)
r.store.mu.replicas.Delete(int64(rightRng.RangeID))
delete(r.store.mu.uninitReplicas, rightRng.RangeID)
r.store.replicaQueues.Delete(rightRng.RangeID)
r.store.replicaQueues.Delete(int64(rightRng.RangeID))
r.store.mu.Unlock()
}
rightRng.raftMu.Unlock()
Expand Down
Loading

0 comments on commit cc645f3

Please sign in to comment.