Skip to content

Commit

Permalink
ccl/sqlproxyccl: add rebalancer queue for rebalance requests
Browse files Browse the repository at this point in the history
This commit adds a rebalancer queue implementation to the balancer component.
The queue will be used for rebalance requests for the connection migration
work. This is done to ensure a centralized location that invokes the
TransferConnection method on the connection handles. Doing this also enables
us to limit the number of concurrent transfers within the proxy.

Release note: None
  • Loading branch information
jaylim-crl committed Apr 5, 2022
1 parent ca927f0 commit 5ff9c33
Show file tree
Hide file tree
Showing 4 changed files with 223 additions and 15 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/balancer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_test(
"//pkg/ccl/sqlproxyccl/tenant",
"//pkg/roachpb",
"//pkg/util/leaktest",
"//pkg/util/timeutil",
"@com_github_stretchr_testify//require",
],
)
88 changes: 88 additions & 0 deletions pkg/ccl/sqlproxyccl/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
package balancer

import (
"container/list"
"context"
"math"
"math/rand"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)

// ErrNoAvailablePods is an error that indicates that no pods are available
Expand Down Expand Up @@ -62,3 +67,86 @@ func (b *Balancer) randFloat32() float32 {
defer b.mu.Unlock()
return b.mu.rng.Float32()
}

// rebalanceRequest corresponds to a rebalance request. For now, this only
// indicates where the connection should be transferred to through dst.
type rebalanceRequest struct {
createdAt time.Time
conn ConnectionHandle
dst string
}

// rebalancerQueue represents the balancer's internal queue which is used for
// rebalancing requests.
type rebalancerQueue struct {
mu syncutil.Mutex
sem semaphore.Semaphore
queue *list.List
elements map[ConnectionHandle]*list.Element
}

// newRebalancerQueue returns a new instance of rebalancerQueue.
func newRebalancerQueue(ctx context.Context) (*rebalancerQueue, error) {
q := &rebalancerQueue{
sem: semaphore.New(math.MaxInt32),
queue: list.New(),
elements: make(map[ConnectionHandle]*list.Element),
}
// sem represents the number of items in the queue, so we'll acquire
// everything to denote an empty queue.
if err := q.sem.Acquire(ctx, math.MaxInt32); err != nil {
return nil, err
}
return q, nil
}

// enqueue puts the rebalance request into the queue. If a request for the
// connection already exists, the newer of the two will be used. This returns
// nil if the operation succeeded.
//
// NOTE: req should not be nil.
func (q *rebalancerQueue) enqueue(req *rebalanceRequest) {
q.mu.Lock()
defer q.mu.Unlock()

e, ok := q.elements[req.conn]
if ok {
// Use the newer request of the two.
if e.Value.(*rebalanceRequest).createdAt.Before(req.createdAt) {
e.Value = req
}
} else {
e = q.queue.PushBack(req)
q.elements[req.conn] = e
}
q.sem.Release(1)
}

// dequeue removes a request at the front of the queue, and returns that. If the
// queue has no items, dequeue will block until the queue is non-empty.
//
// NOTE: It is unsafe to continue using the queue if dequeue returns an error.
func (q *rebalancerQueue) dequeue(ctx context.Context) (*rebalanceRequest, error) {
// Block until there is an item in the queue. There is a possibility where
// Acquire returns an error AND obtains the semaphore. It is unsafe to
// continue using the queue when that happens.
//
// It is deliberate to block on acquiring the semaphore before obtaining
// the mu lock. We need that lock to enqueue items.
if err := q.sem.Acquire(ctx, 1); err != nil {
return nil, err
}

q.mu.Lock()
defer q.mu.Unlock()

e := q.queue.Front()
if e == nil {
// The queue cannot be empty here.
return nil, errors.AssertionFailedf("unexpected empty queue")
}

req := q.queue.Remove(e).(*rebalanceRequest)
delete(q.elements, req.conn)
return req, nil
}
127 changes: 123 additions & 4 deletions pkg/ccl/sqlproxyccl/balancer/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,28 @@
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package balancer_test
package balancer

import (
"context"
"fmt"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestBalancer(t *testing.T) {
defer leaktest.AfterTest(t)()

b := balancer.NewBalancer()
b := NewBalancer()

t.Run("no pods", func(t *testing.T) {
pod, err := b.SelectTenantPod([]*tenant.Pod{})
require.EqualError(t, err, balancer.ErrNoAvailablePods.Error())
require.EqualError(t, err, ErrNoAvailablePods.Error())
require.Nil(t, pod)
})

Expand All @@ -34,3 +37,119 @@ func TestBalancer(t *testing.T) {
require.Contains(t, []string{"1", "2"}, pod.Addr)
})
}

func TestRebalancerQueue(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q, err := newRebalancerQueue(ctx)
require.NoError(t, err)

// Use a custom time source for testing.
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
timeSource := timeutil.NewManualTime(t0)

// Create rebalance requests for the same connection handle.
conn1 := &testBalancerConnHandle{}
req1 := &rebalanceRequest{
createdAt: timeSource.Now(),
conn: conn1,
dst: "foo1",
}
timeSource.Advance(5 * time.Second)
req2 := &rebalanceRequest{
createdAt: timeSource.Now(),
conn: conn1,
dst: "foo2",
}
timeSource.Advance(5 * time.Second)
req3 := &rebalanceRequest{
createdAt: timeSource.Now(),
conn: conn1,
dst: "foo3",
}

// Enqueue in a specific order. req3 overrides req1; req2 is a no-op.
q.enqueue(req1)
q.enqueue(req3)
q.enqueue(req2)
require.Len(t, q.elements, 1)
require.Equal(t, 1, q.queue.Len())

// Create another request.
conn2 := &testBalancerConnHandle{}
req4 := &rebalanceRequest{
createdAt: timeSource.Now(),
conn: conn2,
dst: "bar1",
}
q.enqueue(req4)
require.Len(t, q.elements, 2)
require.Equal(t, 2, q.queue.Len())

// Dequeue the items.
item, err := q.dequeue(ctx)
require.NoError(t, err)
require.Equal(t, req3, item)
item, err = q.dequeue(ctx)
require.NoError(t, err)
require.Equal(t, req4, item)
require.Empty(t, q.elements)
require.Equal(t, 0, q.queue.Len())

// Cancel the context. Dequeue should return immediately with an error.
cancel()
req4, err = q.dequeue(ctx)
require.EqualError(t, err, context.Canceled.Error())
}

func TestRebalancerQueueBlocking(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

q, err := newRebalancerQueue(ctx)
require.NoError(t, err)

reqCh := make(chan *rebalanceRequest, 10)
go func() {
for {
req, err := q.dequeue(ctx)
if err != nil {
break
}
reqCh <- req
}
}()

// Use a custom time source for testing.
t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC)
timeSource := timeutil.NewManualTime(t0)

const reqCount = 100
for i := 0; i < reqCount; i++ {
req := &rebalanceRequest{
createdAt: timeSource.Now(),
conn: &testBalancerConnHandle{},
dst: fmt.Sprint(i),
}
q.enqueue(req)
timeSource.Advance(1 * time.Second)
}

for i := 0; i < reqCount; i++ {
req := <-reqCh
require.Equal(t, fmt.Sprint(i), req.dst)
}
}

// testBalancerConnHandle is a test connection handle that is used for testing
// the balancer. This currently does not require any methods to be implemented.
type testBalancerConnHandle struct {
ConnectionHandle
}

var _ ConnectionHandle = &testBalancerConnHandle{}
22 changes: 11 additions & 11 deletions pkg/ccl/sqlproxyccl/balancer/conn_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ func TestConnTracker(t *testing.T) {
defer leaktest.AfterTest(t)()

tracker := NewConnTracker()
makeConn := func(tenantID int, podAddr string) (roachpb.TenantID, *testConnectionHandle) {
return roachpb.MakeTenantID(uint64(tenantID)), newTestConnectionHandle(podAddr)
makeConn := func(tenantID int, podAddr string) (roachpb.TenantID, *testTrackerConnHandle) {
return roachpb.MakeTenantID(uint64(tenantID)), newTestTrackerConnHandle(podAddr)
}

tenantID, handle := makeConn(20, "127.0.0.10:8090")
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestTenantEntry(t *testing.T) {

entry := newTenantEntry()

h1 := newTestConnectionHandle("10.0.0.1:12345")
h1 := newTestTrackerConnHandle("10.0.0.1:12345")
require.True(t, entry.addHandle(h1))
require.False(t, entry.addHandle(h1))

Expand All @@ -83,25 +83,25 @@ func TestTenantEntry(t *testing.T) {
require.Len(t, conns, 1)
}

// testConnectionHandle is a test connection handle that only implements a
// small subset of methods.
type testConnectionHandle struct {
// testTrackerConnHandle is a test connection handle that only implements a
// small subset of methods used for testing the connection tracker.
type testTrackerConnHandle struct {
ConnectionHandle
remoteAddr string
}

var _ ConnectionHandle = &testConnectionHandle{}
var _ ConnectionHandle = &testTrackerConnHandle{}

func newTestConnectionHandle(remoteAddr string) *testConnectionHandle {
return &testConnectionHandle{remoteAddr: remoteAddr}
func newTestTrackerConnHandle(remoteAddr string) *testTrackerConnHandle {
return &testTrackerConnHandle{remoteAddr: remoteAddr}
}

// Context implements the ConnectionHandle interface.
func (h *testConnectionHandle) Context() context.Context {
func (h *testTrackerConnHandle) Context() context.Context {
return context.Background()
}

// ServerRemoteAddr implements the ConnectionHandle interface.
func (h *testConnectionHandle) ServerRemoteAddr() string {
func (h *testTrackerConnHandle) ServerRemoteAddr() string {
return h.remoteAddr
}

0 comments on commit 5ff9c33

Please sign in to comment.