Skip to content

Commit

Permalink
ccl/sqlproxyccl: include DRAINING pods in the directory cache
Browse files Browse the repository at this point in the history
Previously, #67452 removed DRAINING pods from the directory cache. This commit
adds that back. The connector will now need to filter for RUNNING pods manually
before invoking the balancer. This is needed so that we could track DRAINING
pods, and wait until 60 seconds has elapsed before transferring connections
away from them. To support that, we also update the Pod's proto definition to
include a StateTimestamp field to reprevent that timestamp that the state field
was last updated.

The plan is to have a polling mechanism every X seconds to check DRAINING pods,
and use that information to start migrating connections.

Release note: None
  • Loading branch information
jaylim-crl committed Apr 4, 2022
1 parent bb2c29c commit 5c78e85
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 100 deletions.
15 changes: 9 additions & 6 deletions pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,17 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) {
pods, err := c.DirectoryCache.LookupTenantPods(ctx, c.TenantID, c.ClusterName)
switch {
case err == nil:
// Note that LookupTenantPods will always return RUNNING pods, so this
// is fine for now. If we start changing that to also return DRAINING
// pods, we'd have to filter accordingly.
pod, err := c.Balancer.SelectTenantPod(pods)
runningPods := make([]*tenant.Pod, 0, len(pods))
for _, pod := range pods {
if pod.State == tenant.RUNNING {
runningPods = append(runningPods, pod)
}
}
pod, err := c.Balancer.SelectTenantPod(runningPods)
if err != nil {
// This should never happen because LookupTenantPods ensured that
// len(pods) should never be 0. Mark it as a retriable connection
// anyway.
// there should be at least one RUNNING pod. Mark it as a retriable
// connection anyway.
return "", markAsRetriableConnectorError(err)
}
return pod.Addr, nil
Expand Down
11 changes: 9 additions & 2 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,10 @@ func TestConnector_lookupAddr(t *testing.T) {
require.Equal(t, ctx, fnCtx)
require.Equal(t, c.TenantID, tenantID)
require.Equal(t, c.ClusterName, clusterName)
return []*tenant.Pod{{Addr: "127.0.0.10:80"}}, nil
return []*tenant.Pod{
{Addr: "127.0.0.10:70", State: tenant.DRAINING},
{Addr: "127.0.0.10:80", State: tenant.RUNNING},
}, nil
},
}

Expand Down Expand Up @@ -536,7 +539,11 @@ func TestConnector_lookupAddr(t *testing.T) {

pods := make([]*tenant.Pod, 0, len(mu.pods))
for addr, load := range mu.pods {
pods = append(pods, &tenant.Pod{Addr: addr, Load: load})
pods = append(pods, &tenant.Pod{
Addr: addr,
Load: load,
State: tenant.RUNNING,
})
}
return pods, nil
},
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/sqlproxyccl/tenant/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ proto_library(
srcs = ["directory.proto"],
strip_import_prefix = "/pkg",
visibility = ["//visibility:public"],
deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"],
deps = [
"@com_github_gogo_protobuf//gogoproto:gogo_proto",
"@com_google_protobuf//:timestamp_proto",
],
)

go_proto_library(
Expand Down
63 changes: 33 additions & 30 deletions pkg/ccl/sqlproxyccl/tenant/directory.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,21 @@ package cockroach.ccl.sqlproxyccl.tenant;
option go_package="tenant";

import "gogoproto/gogo.proto";

// WatchPodsRequest is empty as we want to get all notifications.
message WatchPodsRequest {}
import "google/protobuf/timestamp.proto";

// PodState gives the current state of a tenant pod, so that the proxy knows
// how/where to route traffic.
// NOTE: This is not the same as the Kubernetes Pod Status.
enum PodState {
option (gogoproto.goproto_enum_prefix) = false;

// UNKNOWN indicates that the pod values being reported are from a
// potentially out of date source. UNKNOWN may be used to notify updates to
// pod values when the pod's state may be out of date by the time the update
// is processed.
// UNKNOWN indicates that the pod values being reported are from a potentially
// out of date source. UNKNOWN may be used to notify updates to pod values
// when the pod's state may be out of date by the time the update is processed.
UNKNOWN = 0;
// RUNNING indicates the pod may have active SQL connections and is ready to
// accept new SQL connections.
//
// NOTE: The proxy must still be prepared to retry connections against a
// running pod in case of transient failures.
RUNNING = 1;
Expand All @@ -46,57 +44,62 @@ enum PodState {
// location, and state.
message Pod {
// TenantID is the tenant that owns the pod.
uint64 tenant_id = 2[(gogoproto.customname) = "TenantID"];
// Addr is the ip and port combo identifying the tenant pod, (e.g.
uint64 tenant_id = 2 [(gogoproto.customname) = "TenantID"];
// addr is the ip and port combination identifying the tenant pod, (e.g.
// 132.130.1.11:34576).
string Addr = 1;
// PodState gives the current status of the tenant pod.
PodState State = 3;
string addr = 1;
// state gives the current status of the tenant pod.
PodState state = 3;
// Load is a number in the range [0, 1] indicating the current amount of load
// experienced by this tenant pod.
float Load = 4;
// StateTimestamp represents the timestamp that the state was last updated.
google.protobuf.Timestamp stateTimestamp = 5 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true];
}

// ListPodsRequest is used to query the server for the list of current pods of
// a given tenant.
message ListPodsRequest {
// TenantID identifies the tenant for which the client is requesting a list of
// the pods.
uint64 tenant_id = 1 [(gogoproto.customname) = "TenantID"];
}

// ListPodsResponse is sent back as a result of requesting the list of pods for
// a given tenant.
message ListPodsResponse {
// Pods is the list of RUNNING and/or DRAINING pods for the requested tenant.
// It does not include DELETING pods.
repeated Pod pods = 1;
}

// WatchPodsRequest is empty as we want to get all notifications.
message WatchPodsRequest {}

// WatchPodsResponse represents the notifications that the server sends to
// its clients when clients want to monitor the directory server activity.
message WatchPodsResponse {
// Pod describes the tenant pod which has been added, modified, or deleted.
Pod pod = 1;
}

// ListPodsRequest is used to query the server for the list of current
// pods of a given tenant.
message ListPodsRequest {
// TenantID identifies the tenant for which the client is requesting a list of
// the pods.
uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"];
}

// EnsurePodRequest is used to ensure that at least one tenant pod is in the
// RUNNING state.
message EnsurePodRequest {
// TenantID is the id of the tenant for which a RUNNING pod is requested.
uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"];
uint64 tenant_id = 1 [(gogoproto.customname) = "TenantID"];
}

// EnsurePodResponse is empty and indicates that the server processed the
// request.
message EnsurePodResponse {
}

// ListPodsResponse is sent back as a result of requesting the list of pods for
// a given tenant.
message ListPodsResponse {
// Pods is the list of RUNNING and/or DRAINING pods for the requested tenant.
// It does not include DELETING pods.
repeated Pod pods = 1;
}

// GetTenantRequest is used by a client to request from the sever metadata
// related to a given tenant.
message GetTenantRequest {
// TenantID identifies the tenant for which the metadata is being requested.
uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"];
uint64 tenant_id = 1 [(gogoproto.customname) = "TenantID"];
}

// GetTenantResponse is sent back when a client requests metadata for a tenant.
Expand Down
37 changes: 23 additions & 14 deletions pkg/ccl/sqlproxyccl/tenant/directory_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ func NewDirectoryCache(
return dir, nil
}

// LookupTenantPods returns a list of SQL pods in the RUNNING state for the
// given tenant. If the tenant was just created or is suspended, such that there
// are no available processes, then LookupTenantPods will trigger resumption of a
// new instance and block until the process is ready.
// LookupTenantPods returns a list of SQL pods in the RUNNING and DRAINING
// states for the given tenant. If the tenant was just created or is suspended,
// such that there are no available RUNNING processes, then LookupTenantPods
// will trigger resumption of a new instance (or a conversion of a DRAINING pod
// to a RUNNING one) and block until that happens.
//
// If clusterName is non-empty, then a GRPC NotFound error is returned if no
// pods match the cluster name. This can be used to ensure that the incoming SQL
Expand Down Expand Up @@ -191,7 +192,15 @@ func (d *directoryCache) LookupTenantPods(

ctx, _ = d.stopper.WithCancelOnQuiesce(ctx)
tenantPods := entry.GetPods()
if len(tenantPods) == 0 {

// Trigger resumption if there are no RUNNING pods.
runningPods := make([]*Pod, 0, len(tenantPods))
for _, pod := range tenantPods {
if pod.State == RUNNING {
runningPods = append(runningPods, pod)
}
}
if len(runningPods) == 0 {
// There are no known pod IP addresses, so fetch pod information from
// the directory server. Resume the tenant if it is suspended; that
// will always result in at least one pod IP address (or an error).
Expand All @@ -206,12 +215,12 @@ func (d *directoryCache) LookupTenantPods(
return tenantPods, nil
}

// TryLookupTenantPods returns a list of SQL pods in the RUNNING state for the
// given tenant. It returns a GRPC NotFound error if the tenant does not exist
// (e.g. it has not yet been created) or if it has not yet been fetched into the
// directory's cache (TryLookupTenantPods will never attempt to fetch it). If no
// processes are available for the tenant, TryLookupTenantPods will return the
// empty set (unlike LookupTenantPod).
// TryLookupTenantPods returns a list of SQL pods in the RUNNING and DRAINING
// states for thegiven tenant. It returns a GRPC NotFound error if the tenant
// does not exist (e.g. it has not yet been created) or if it has not yet been
// fetched into the directory's cache (TryLookupTenantPods will never attempt to
// fetch it). If no processes are available for the tenant, TryLookupTenantPods
// will return the empty set (unlike LookupTenantPod).
//
// WARNING: Callers should never attempt to modify values returned by this
// method, or else they may be a race. Other instances may be reading from the
Expand Down Expand Up @@ -444,8 +453,8 @@ func (d *directoryCache) updateTenantEntry(ctx context.Context, pod *Pod) {
}

switch pod.State {
case RUNNING:
// Add entries of RUNNING pods if they are not already present.
case RUNNING, DRAINING:
// Add entries of RUNNING and DRAINING pods if they are not already present.
if entry.AddPod(pod) {
log.Infof(ctx, "added IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
} else {
Expand All @@ -457,7 +466,7 @@ func (d *directoryCache) updateTenantEntry(ctx context.Context, pod *Pod) {
log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID)
}
default:
// Remove addresses of DRAINING and DELETING pods.
// Remove addresses of DELETING pods.
if entry.RemovePodByAddr(pod.Addr) {
log.Infof(ctx, "deleted IP address %s for tenant %d", pod.Addr, pod.TenantID)
}
Expand Down
33 changes: 20 additions & 13 deletions pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,15 @@ func TestWatchPods(t *testing.T) {
require.Equal(t, tenantID.ToUint64(), pod.TenantID)
require.Equal(t, addr, pod.Addr)
require.Equal(t, tenant.RUNNING, pod.State)
require.False(t, pod.StateTimestamp.IsZero())

// Trigger drain of pod.
tds.Drain()
pod = <-podWatcher
require.Equal(t, tenantID.ToUint64(), pod.TenantID)
require.Equal(t, addr, pod.Addr)
require.Equal(t, tenant.DRAINING, pod.State)

// Ensure that all addresses have been cleared from the directory, since
// it should only return RUNNING addresses.
require.Eventually(t, func() bool {
tenantPods, _ := dir.TryLookupTenantPods(ctx, tenantID)
return len(tenantPods) == 0
}, 10*time.Second, 100*time.Millisecond)
require.False(t, pod.StateTimestamp.IsZero())

// Now shut the tenant directory down.
processes := tds.Get(tenantID)
Expand All @@ -121,13 +116,14 @@ func TestWatchPods(t *testing.T) {
require.Equal(t, tenantID.ToUint64(), pod.TenantID)
require.Equal(t, addr, pod.Addr)
require.Equal(t, tenant.DELETING, pod.State)
require.False(t, pod.StateTimestamp.IsZero())

// We know that the directory should have been emptied earlier since we
// don't add DRAINING pods to the directory, so putting the pod into the
// DELETING state should not make a difference.
pods, err = dir.TryLookupTenantPods(ctx, tenantID)
require.NoError(t, err)
require.Empty(t, pods)
// Ensure that all addresses have been cleared from the directory, since
// it should only return RUNNING or DRAINING addresses.
require.Eventually(t, func() bool {
tenantPods, _ := dir.TryLookupTenantPods(ctx, tenantID)
return len(tenantPods) == 0
}, 10*time.Second, 100*time.Millisecond)

// Resume tenant again by a direct call to the directory server
_, err = tds.EnsurePod(ctx, &tenant.EnsurePodRequest{tenantID.ToUint64()})
Expand All @@ -148,6 +144,7 @@ func TestWatchPods(t *testing.T) {
require.Equal(t, tenantID.ToUint64(), pod.TenantID)
require.Equal(t, addr, pod.Addr)
require.Equal(t, tenant.RUNNING, pod.State)
require.False(t, pod.StateTimestamp.IsZero())

// Verify that LookupTenantPods returns the pod's IP address.
pods, err = dir.LookupTenantPods(ctx, tenantID, "")
Expand Down Expand Up @@ -177,6 +174,7 @@ func TestWatchPods(t *testing.T) {
require.Equal(t, tenantID.ToUint64(), pod.TenantID)
require.Equal(t, addr, pod.Addr)
require.Equal(t, tenant.DELETING, pod.State)
require.False(t, pod.StateTimestamp.IsZero())

// Verify that a new call to LookupTenantPods will resume again the tenant.
pods, err = dir.LookupTenantPods(ctx, tenantID, "")
Expand All @@ -189,6 +187,7 @@ func TestWatchPods(t *testing.T) {
require.Equal(t, tenantID.ToUint64(), pod.TenantID)
require.Equal(t, addr, pod.Addr)
require.Equal(t, tenant.RUNNING, pod.State)
require.False(t, pod.StateTimestamp.IsZero())
}

func TestCancelLookups(t *testing.T) {
Expand Down Expand Up @@ -352,6 +351,10 @@ func TestRefreshThrottling(t *testing.T) {
require.NoError(t, dir.ReportFailure(ctx, tenantID, addr))
pods, err = dir.TryLookupTenantPods(ctx, tenantID)
require.NoError(t, err)
require.NotEmpty(t, pods)

// Reset StateTimestamp for deterministic comparison.
pods[0].StateTimestamp = time.Time{}
require.Equal(t, []*tenant.Pod{{
TenantID: tenantID.ToUint64(),
Addr: addr,
Expand All @@ -364,6 +367,10 @@ func TestRefreshThrottling(t *testing.T) {
require.NoError(t, dir.ReportFailure(ctx, tenantID, addr))
pods, err = dir.TryLookupTenantPods(ctx, tenantID)
require.NoError(t, err)
require.NotEmpty(t, pods)

// Reset StateTimestamp for deterministic comparison.
pods[0].StateTimestamp = time.Time{}
require.Equal(t, []*tenant.Pod{{
TenantID: tenantID.ToUint64(),
Addr: addr,
Expand Down
17 changes: 4 additions & 13 deletions pkg/ccl/sqlproxyccl/tenant/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,26 +247,17 @@ func (e *tenantEntry) fetchPodsLocked(
return nil, err
}

// Get updated list of RUNNING pod IP addresses and save it to the entry.
tenantPods = make([]*Pod, 0, len(list.Pods))
for i := range list.Pods {
pod := list.Pods[i]
if pod.State == RUNNING {
tenantPods = append(tenantPods, pod)
}
}

// Need to lock in case another thread is reading the IP addresses (e.g. in
// ChoosePodAddr).
e.pods.Lock()
defer e.pods.Unlock()
e.pods.pods = tenantPods
e.pods.pods = list.Pods

if len(tenantPods) != 0 {
log.Infof(ctx, "fetched IP addresses: %v", tenantPods)
if len(e.pods.pods) != 0 {
log.Infof(ctx, "fetched IP addresses: %v", e.pods.pods)
}

return tenantPods, nil
return e.pods.pods, nil
}

// canRefreshLocked returns true if it's been at least X milliseconds since the
Expand Down
Loading

0 comments on commit 5c78e85

Please sign in to comment.