Skip to content

Commit

Permalink
sqlproxyccl: statistically load balance across tenants
Browse files Browse the repository at this point in the history
    Previously, the SQLProxy's Directory implementation would always
    select the first instance from the pods list. As part of the
    SQLProxy's duty is load balancing, this was inadequate. This commit
    upgrades the tenant dir to have pods report their CPU load. This
    load is then used to perform a weighted distribution across all
    running tenant pods. Load reporters may elect to use the new UNKNOWN
    state to indicate their updates should be ignored if the proxy is
    unaware of a running pod at the given address.

Release note: None
  • Loading branch information
chrisseto committed Jul 12, 2021
1 parent 81e8ce0 commit 868e65e
Show file tree
Hide file tree
Showing 9 changed files with 435 additions and 97 deletions.
5 changes: 4 additions & 1 deletion pkg/ccl/sqlproxyccl/tenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
srcs = [
"directory.go",
"entry.go",
"pod.go",
":mocks_tenant", # keep
],
embed = [":tenant_go_proto"],
Expand All @@ -34,6 +35,7 @@ go_library(
"//pkg/roachpb",
"//pkg/util/grpcutil",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down Expand Up @@ -61,9 +63,10 @@ go_test(
srcs = [
"directory_test.go",
"main_test.go",
"pod_test.go",
],
embed = [":tenant"],
deps = [
":tenant",
"//pkg/base",
"//pkg/ccl",
"//pkg/ccl/kvccl/kvtenantccl",
Expand Down
33 changes: 23 additions & 10 deletions pkg/ccl/sqlproxyccl/tenant/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,13 @@ func (d *Directory) LookupTenantAddrs(
return nil, status.Errorf(
codes.NotFound, "tenant %d not in directory cache", tenantID.ToUint64())
}
return entry.getPodAddrs(), nil

tenantPods := entry.getPods()
addrs := make([]string, len(tenantPods))
for i, pod := range tenantPods {
addrs[i] = pod.Addr
}
return addrs, nil
}

// ReportFailure should be called when attempts to connect to a particular SQL
Expand Down Expand Up @@ -376,8 +382,7 @@ func (d *Directory) watchPods(ctx context.Context, stopper *stop.Stopper) error
// watcher events. When a pod is created, destroyed, or modified, it updates the
// tenant's entry to reflect that change.
func (d *Directory) updateTenantEntry(ctx context.Context, pod *Pod) {
podAddr := pod.Addr
if podAddr == "" {
if pod.Addr == "" {
// Nothing needs to be done if there is no IP address specified.
return
}
Expand All @@ -393,15 +398,23 @@ func (d *Directory) updateTenantEntry(ctx context.Context, pod *Pod) {
return
}

if pod.State == RUNNING {
// Add addresses of RUNNING pods if they are not already present.
if entry.AddPodAddr(podAddr) {
log.Infof(ctx, "added IP address %s for tenant %d", podAddr, pod.TenantID)
switch pod.State {
case RUNNING:
// Add entries of RUNNING pods if they are not already present.
if entry.AddPod(pod) {
log.Infof(ctx, "added IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
} else {
log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
}
// Update entries of UNKNOWN pods only if they are already present.
case UNKNOWN:
if entry.UpdatePod(pod) {
log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
}
} else {
default:
// Remove addresses of DRAINING and DELETING pods.
if entry.RemovePodAddr(podAddr) {
log.Infof(ctx, "deleted IP address %s for tenant %d", podAddr, pod.TenantID)
if entry.RemovePodByAddr(pod.Addr) {
log.Infof(ctx, "deleted IP address %s for tenant %d", pod.Addr, pod.TenantID)
}
}
}
Expand Down
97 changes: 65 additions & 32 deletions pkg/ccl/sqlproxyccl/tenant/directory.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/ccl/sqlproxyccl/tenant/directory.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ enum PodState {
// DELETING indicates that the pod is being terminated. This state is only
// used by WatchPods.
DELETING = 2;
// UNKNOWN indicates that the pod values being reported are from a
// potentially out of date source. UNKNOWN may be used to notify updates to
// pod values when the pod's state may be out of date by the time the update
// is processed.
UNKNOWN = 3;
}

// Pod contains information about a tenant pod, such as its tenant owner,
Expand All @@ -47,6 +52,9 @@ message Pod {
string Addr = 1;
// PodState gives the current status of the tenant pod.
PodState State = 3;
// Load is a number in the range [0, 1] indicating the current amount of load
// experienced by this tenant pod.
float Load = 4;
}

// WatchPodsResponse represents the notifications that the server sends to
Expand Down
98 changes: 98 additions & 0 deletions pkg/ccl/sqlproxyccl/tenant/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,104 @@ func TestRefreshThrottling(t *testing.T) {
require.Equal(t, []string{addr}, addrs)
}

func TestLoadBalancing(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.ScopeWithoutShowLogs(t).Close(t)

// Make pod watcher channel.
podWatcher := make(chan *tenant.Pod, 10)

// Create the directory.
ctx := context.Background()
tc, dir, tds := newTestDirectory(t, tenant.PodWatcher(podWatcher))
defer tc.Stopper().Stop(ctx)

tenantID := roachpb.MakeTenantID(30)
require.NoError(t, createTenant(tc, tenantID))

// Forcibly start two instances of tenantID.
require.NoError(t, tds.StartTenant(ctx, tenantID))
<-podWatcher

require.NoError(t, tds.StartTenant(ctx, tenantID))
<-podWatcher

// Ensure that instance 2 has started.
processes := tds.Get(tenantID)
require.NotNil(t, processes)
require.Len(t, processes, 2)

// Wait for the background watcher to populate both pods.
require.Eventually(t, func() bool {
addrs, _ := dir.LookupTenantAddrs(ctx, tenantID)
return len(addrs) == 2
}, 10*time.Second, 100*time.Millisecond)

var processAddrs []net.Addr
for k := range processes {
processAddrs = append(processAddrs, k)
}

// Both tenants will have the same initial (and fake) load reporting.
// Observe that EnsureTenantAddr evenly distributes load across them.
responses := map[string]int{}
for i := 0; i < 100; i++ {
addr, err := dir.EnsureTenantAddr(ctx, tenantID, "")
require.NoError(t, err)
responses[addr] += 1
}

// Assert that the distribution is roughly 50/50.
require.InDelta(t, responses[processAddrs[0].String()], 50, 15)
require.InDelta(t, responses[processAddrs[1].String()], 50, 15)

// Adjust load such that the distribution will be a 25/75 split.
tds.SetFakeLoad(tenantID, processes[processAddrs[0]].SQL, 0.25)
pod := <-podWatcher
require.Equal(t, float32(0.25), pod.Load)
require.Equal(t, processes[processAddrs[0]].SQL.String(), pod.Addr)

tds.SetFakeLoad(tenantID, processes[processAddrs[1]].SQL, 0.75)
pod = <-podWatcher
require.Equal(t, float32(0.75), pod.Load)
require.Equal(t, processes[processAddrs[1]].SQL.String(), pod.Addr)

// There's no way to syncronize on the directory updating it's internal
// state. It requires 2 loop iterations, so 250ms should be more than
// enough.
time.Sleep(250 * time.Millisecond)

responses = map[string]int{}
for i := 0; i < 100; i++ {
addr, err := dir.EnsureTenantAddr(ctx, tenantID, "")
require.NoError(t, err)
responses[addr] += 1
}

// Observe that the distribution is now a 3/1 split.
require.InDelta(t, responses[processAddrs[0].String()], 75, 15)
require.InDelta(t, responses[processAddrs[1].String()], 25, 15)

// Stop our the running tenants.
for _, process := range processes {
process.Stopper.Stop(ctx)
}

// Wait for the tenantdir to reflect that no tenants are running any more.
require.Eventually(t, func() bool {
addrs, _ := dir.LookupTenantAddrs(ctx, tenantID)
return len(addrs) == 0
}, 10*time.Second, 100*time.Millisecond)

// Set the load on a deleted tenant.
tds.SetFakeLoad(tenantID, processes[processAddrs[0]].SQL, 0.8)
_ = <-podWatcher

// And ensure that the deleted entry does not get resurrected.
addrs, _ := dir.LookupTenantAddrs(ctx, tenantID)
require.Len(t, addrs, 0)
}

func createTenant(tc serverutils.TestClusterInterface, id roachpb.TenantID) error {
srv := tc.Server(0)
conn := srv.InternalExecutor().(*sql.InternalExecutor)
Expand Down
Loading

0 comments on commit 868e65e

Please sign in to comment.