From 5ff9c3306bff2098980ed418293bd2e164dbc206 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 4 Apr 2022 10:32:39 -0400 Subject: [PATCH] ccl/sqlproxyccl: add rebalancer queue for rebalance requests This commit adds a rebalancer queue implementation to the balancer component. The queue will be used for rebalance requests for the connection migration work. This is done to ensure a centralized location that invokes the TransferConnection method on the connection handles. Doing this also enables us to limit the number of concurrent transfers within the proxy. Release note: None --- pkg/ccl/sqlproxyccl/balancer/BUILD.bazel | 1 + pkg/ccl/sqlproxyccl/balancer/balancer.go | 88 ++++++++++++ pkg/ccl/sqlproxyccl/balancer/balancer_test.go | 127 +++++++++++++++++- .../sqlproxyccl/balancer/conn_tracker_test.go | 22 +-- 4 files changed, 223 insertions(+), 15 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/balancer/BUILD.bazel b/pkg/ccl/sqlproxyccl/balancer/BUILD.bazel index 3739e2a8e8b8..5e2e02b080e6 100644 --- a/pkg/ccl/sqlproxyccl/balancer/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/balancer/BUILD.bazel @@ -33,6 +33,7 @@ go_test( "//pkg/ccl/sqlproxyccl/tenant", "//pkg/roachpb", "//pkg/util/leaktest", + "//pkg/util/timeutil", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/sqlproxyccl/balancer/balancer.go b/pkg/ccl/sqlproxyccl/balancer/balancer.go index d0b166f797ae..6892d964fdf6 100644 --- a/pkg/ccl/sqlproxyccl/balancer/balancer.go +++ b/pkg/ccl/sqlproxyccl/balancer/balancer.go @@ -9,12 +9,17 @@ package balancer import ( + "container/list" + "context" + "math" "math/rand" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/marusama/semaphore" ) // ErrNoAvailablePods is an error that indicates that no pods are available @@ -62,3 +67,86 @@ func (b *Balancer) randFloat32() float32 { defer b.mu.Unlock() return b.mu.rng.Float32() } + +// rebalanceRequest corresponds to a rebalance request. For now, this only +// indicates where the connection should be transferred to through dst. +type rebalanceRequest struct { + createdAt time.Time + conn ConnectionHandle + dst string +} + +// rebalancerQueue represents the balancer's internal queue which is used for +// rebalancing requests. +type rebalancerQueue struct { + mu syncutil.Mutex + sem semaphore.Semaphore + queue *list.List + elements map[ConnectionHandle]*list.Element +} + +// newRebalancerQueue returns a new instance of rebalancerQueue. +func newRebalancerQueue(ctx context.Context) (*rebalancerQueue, error) { + q := &rebalancerQueue{ + sem: semaphore.New(math.MaxInt32), + queue: list.New(), + elements: make(map[ConnectionHandle]*list.Element), + } + // sem represents the number of items in the queue, so we'll acquire + // everything to denote an empty queue. + if err := q.sem.Acquire(ctx, math.MaxInt32); err != nil { + return nil, err + } + return q, nil +} + +// enqueue puts the rebalance request into the queue. If a request for the +// connection already exists, the newer of the two will be used. This returns +// nil if the operation succeeded. +// +// NOTE: req should not be nil. +func (q *rebalancerQueue) enqueue(req *rebalanceRequest) { + q.mu.Lock() + defer q.mu.Unlock() + + e, ok := q.elements[req.conn] + if ok { + // Use the newer request of the two. + if e.Value.(*rebalanceRequest).createdAt.Before(req.createdAt) { + e.Value = req + } + } else { + e = q.queue.PushBack(req) + q.elements[req.conn] = e + } + q.sem.Release(1) +} + +// dequeue removes a request at the front of the queue, and returns that. If the +// queue has no items, dequeue will block until the queue is non-empty. +// +// NOTE: It is unsafe to continue using the queue if dequeue returns an error. +func (q *rebalancerQueue) dequeue(ctx context.Context) (*rebalanceRequest, error) { + // Block until there is an item in the queue. There is a possibility where + // Acquire returns an error AND obtains the semaphore. It is unsafe to + // continue using the queue when that happens. + // + // It is deliberate to block on acquiring the semaphore before obtaining + // the mu lock. We need that lock to enqueue items. + if err := q.sem.Acquire(ctx, 1); err != nil { + return nil, err + } + + q.mu.Lock() + defer q.mu.Unlock() + + e := q.queue.Front() + if e == nil { + // The queue cannot be empty here. + return nil, errors.AssertionFailedf("unexpected empty queue") + } + + req := q.queue.Remove(e).(*rebalanceRequest) + delete(q.elements, req.conn) + return req, nil +} diff --git a/pkg/ccl/sqlproxyccl/balancer/balancer_test.go b/pkg/ccl/sqlproxyccl/balancer/balancer_test.go index 9c687a6be868..1b07171e4805 100644 --- a/pkg/ccl/sqlproxyccl/balancer/balancer_test.go +++ b/pkg/ccl/sqlproxyccl/balancer/balancer_test.go @@ -6,25 +6,28 @@ // // https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt -package balancer_test +package balancer import ( + "context" + "fmt" "testing" + "time" - "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/balancer" "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/stretchr/testify/require" ) func TestBalancer(t *testing.T) { defer leaktest.AfterTest(t)() - b := balancer.NewBalancer() + b := NewBalancer() t.Run("no pods", func(t *testing.T) { pod, err := b.SelectTenantPod([]*tenant.Pod{}) - require.EqualError(t, err, balancer.ErrNoAvailablePods.Error()) + require.EqualError(t, err, ErrNoAvailablePods.Error()) require.Nil(t, pod) }) @@ -34,3 +37,119 @@ func TestBalancer(t *testing.T) { require.Contains(t, []string{"1", "2"}, pod.Addr) }) } + +func TestRebalancerQueue(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + q, err := newRebalancerQueue(ctx) + require.NoError(t, err) + + // Use a custom time source for testing. + t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + timeSource := timeutil.NewManualTime(t0) + + // Create rebalance requests for the same connection handle. + conn1 := &testBalancerConnHandle{} + req1 := &rebalanceRequest{ + createdAt: timeSource.Now(), + conn: conn1, + dst: "foo1", + } + timeSource.Advance(5 * time.Second) + req2 := &rebalanceRequest{ + createdAt: timeSource.Now(), + conn: conn1, + dst: "foo2", + } + timeSource.Advance(5 * time.Second) + req3 := &rebalanceRequest{ + createdAt: timeSource.Now(), + conn: conn1, + dst: "foo3", + } + + // Enqueue in a specific order. req3 overrides req1; req2 is a no-op. + q.enqueue(req1) + q.enqueue(req3) + q.enqueue(req2) + require.Len(t, q.elements, 1) + require.Equal(t, 1, q.queue.Len()) + + // Create another request. + conn2 := &testBalancerConnHandle{} + req4 := &rebalanceRequest{ + createdAt: timeSource.Now(), + conn: conn2, + dst: "bar1", + } + q.enqueue(req4) + require.Len(t, q.elements, 2) + require.Equal(t, 2, q.queue.Len()) + + // Dequeue the items. + item, err := q.dequeue(ctx) + require.NoError(t, err) + require.Equal(t, req3, item) + item, err = q.dequeue(ctx) + require.NoError(t, err) + require.Equal(t, req4, item) + require.Empty(t, q.elements) + require.Equal(t, 0, q.queue.Len()) + + // Cancel the context. Dequeue should return immediately with an error. + cancel() + req4, err = q.dequeue(ctx) + require.EqualError(t, err, context.Canceled.Error()) +} + +func TestRebalancerQueueBlocking(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + q, err := newRebalancerQueue(ctx) + require.NoError(t, err) + + reqCh := make(chan *rebalanceRequest, 10) + go func() { + for { + req, err := q.dequeue(ctx) + if err != nil { + break + } + reqCh <- req + } + }() + + // Use a custom time source for testing. + t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + timeSource := timeutil.NewManualTime(t0) + + const reqCount = 100 + for i := 0; i < reqCount; i++ { + req := &rebalanceRequest{ + createdAt: timeSource.Now(), + conn: &testBalancerConnHandle{}, + dst: fmt.Sprint(i), + } + q.enqueue(req) + timeSource.Advance(1 * time.Second) + } + + for i := 0; i < reqCount; i++ { + req := <-reqCh + require.Equal(t, fmt.Sprint(i), req.dst) + } +} + +// testBalancerConnHandle is a test connection handle that is used for testing +// the balancer. This currently does not require any methods to be implemented. +type testBalancerConnHandle struct { + ConnectionHandle +} + +var _ ConnectionHandle = &testBalancerConnHandle{} diff --git a/pkg/ccl/sqlproxyccl/balancer/conn_tracker_test.go b/pkg/ccl/sqlproxyccl/balancer/conn_tracker_test.go index 8f5072f163ec..ff1c2b288128 100644 --- a/pkg/ccl/sqlproxyccl/balancer/conn_tracker_test.go +++ b/pkg/ccl/sqlproxyccl/balancer/conn_tracker_test.go @@ -25,8 +25,8 @@ func TestConnTracker(t *testing.T) { defer leaktest.AfterTest(t)() tracker := NewConnTracker() - makeConn := func(tenantID int, podAddr string) (roachpb.TenantID, *testConnectionHandle) { - return roachpb.MakeTenantID(uint64(tenantID)), newTestConnectionHandle(podAddr) + makeConn := func(tenantID int, podAddr string) (roachpb.TenantID, *testTrackerConnHandle) { + return roachpb.MakeTenantID(uint64(tenantID)), newTestTrackerConnHandle(podAddr) } tenantID, handle := makeConn(20, "127.0.0.10:8090") @@ -69,7 +69,7 @@ func TestTenantEntry(t *testing.T) { entry := newTenantEntry() - h1 := newTestConnectionHandle("10.0.0.1:12345") + h1 := newTestTrackerConnHandle("10.0.0.1:12345") require.True(t, entry.addHandle(h1)) require.False(t, entry.addHandle(h1)) @@ -83,25 +83,25 @@ func TestTenantEntry(t *testing.T) { require.Len(t, conns, 1) } -// testConnectionHandle is a test connection handle that only implements a -// small subset of methods. -type testConnectionHandle struct { +// testTrackerConnHandle is a test connection handle that only implements a +// small subset of methods used for testing the connection tracker. +type testTrackerConnHandle struct { ConnectionHandle remoteAddr string } -var _ ConnectionHandle = &testConnectionHandle{} +var _ ConnectionHandle = &testTrackerConnHandle{} -func newTestConnectionHandle(remoteAddr string) *testConnectionHandle { - return &testConnectionHandle{remoteAddr: remoteAddr} +func newTestTrackerConnHandle(remoteAddr string) *testTrackerConnHandle { + return &testTrackerConnHandle{remoteAddr: remoteAddr} } // Context implements the ConnectionHandle interface. -func (h *testConnectionHandle) Context() context.Context { +func (h *testTrackerConnHandle) Context() context.Context { return context.Background() } // ServerRemoteAddr implements the ConnectionHandle interface. -func (h *testConnectionHandle) ServerRemoteAddr() string { +func (h *testTrackerConnHandle) ServerRemoteAddr() string { return h.remoteAddr }