From 5c78e85eab71d721b17260bd593ed56a07b55d23 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 4 Apr 2022 15:30:18 -0400 Subject: [PATCH] ccl/sqlproxyccl: include DRAINING pods in the directory cache Previously, #67452 removed DRAINING pods from the directory cache. This commit adds that back. The connector will now need to filter for RUNNING pods manually before invoking the balancer. This is needed so that we could track DRAINING pods, and wait until 60 seconds has elapsed before transferring connections away from them. To support that, we also update the Pod's proto definition to include a StateTimestamp field to reprevent that timestamp that the state field was last updated. The plan is to have a polling mechanism every X seconds to check DRAINING pods, and use that information to start migrating connections. Release note: None --- pkg/ccl/sqlproxyccl/connector.go | 15 +++-- pkg/ccl/sqlproxyccl/connector_test.go | 11 +++- pkg/ccl/sqlproxyccl/tenant/BUILD.bazel | 5 +- pkg/ccl/sqlproxyccl/tenant/directory.proto | 63 ++++++++++--------- pkg/ccl/sqlproxyccl/tenant/directory_cache.go | 37 ++++++----- .../tenant/directory_cache_test.go | 33 ++++++---- pkg/ccl/sqlproxyccl/tenant/entry.go | 17 ++--- .../tenantdirsvr/test_directory_svr.go | 41 ++++++------ .../tenantdirsvr/test_simple_directory_svr.go | 8 ++- 9 files changed, 130 insertions(+), 100 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/connector.go b/pkg/ccl/sqlproxyccl/connector.go index b5dd5dfe075c..4971959473f8 100644 --- a/pkg/ccl/sqlproxyccl/connector.go +++ b/pkg/ccl/sqlproxyccl/connector.go @@ -275,14 +275,17 @@ func (c *connector) lookupAddr(ctx context.Context) (string, error) { pods, err := c.DirectoryCache.LookupTenantPods(ctx, c.TenantID, c.ClusterName) switch { case err == nil: - // Note that LookupTenantPods will always return RUNNING pods, so this - // is fine for now. If we start changing that to also return DRAINING - // pods, we'd have to filter accordingly. - pod, err := c.Balancer.SelectTenantPod(pods) + runningPods := make([]*tenant.Pod, 0, len(pods)) + for _, pod := range pods { + if pod.State == tenant.RUNNING { + runningPods = append(runningPods, pod) + } + } + pod, err := c.Balancer.SelectTenantPod(runningPods) if err != nil { // This should never happen because LookupTenantPods ensured that - // len(pods) should never be 0. Mark it as a retriable connection - // anyway. + // there should be at least one RUNNING pod. Mark it as a retriable + // connection anyway. return "", markAsRetriableConnectorError(err) } return pod.Addr, nil diff --git a/pkg/ccl/sqlproxyccl/connector_test.go b/pkg/ccl/sqlproxyccl/connector_test.go index f27ca95bee93..936b311f5569 100644 --- a/pkg/ccl/sqlproxyccl/connector_test.go +++ b/pkg/ccl/sqlproxyccl/connector_test.go @@ -496,7 +496,10 @@ func TestConnector_lookupAddr(t *testing.T) { require.Equal(t, ctx, fnCtx) require.Equal(t, c.TenantID, tenantID) require.Equal(t, c.ClusterName, clusterName) - return []*tenant.Pod{{Addr: "127.0.0.10:80"}}, nil + return []*tenant.Pod{ + {Addr: "127.0.0.10:70", State: tenant.DRAINING}, + {Addr: "127.0.0.10:80", State: tenant.RUNNING}, + }, nil }, } @@ -536,7 +539,11 @@ func TestConnector_lookupAddr(t *testing.T) { pods := make([]*tenant.Pod, 0, len(mu.pods)) for addr, load := range mu.pods { - pods = append(pods, &tenant.Pod{Addr: addr, Load: load}) + pods = append(pods, &tenant.Pod{ + Addr: addr, + Load: load, + State: tenant.RUNNING, + }) } return pods, nil }, diff --git a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel index 0e2e01dcce5a..7b570b7d2d25 100644 --- a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel @@ -8,7 +8,10 @@ proto_library( srcs = ["directory.proto"], strip_import_prefix = "/pkg", visibility = ["//visibility:public"], - deps = ["@com_github_gogo_protobuf//gogoproto:gogo_proto"], + deps = [ + "@com_github_gogo_protobuf//gogoproto:gogo_proto", + "@com_google_protobuf//:timestamp_proto", + ], ) go_proto_library( diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.proto b/pkg/ccl/sqlproxyccl/tenant/directory.proto index d22edd18ffe3..8862b9304071 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.proto +++ b/pkg/ccl/sqlproxyccl/tenant/directory.proto @@ -11,9 +11,7 @@ package cockroach.ccl.sqlproxyccl.tenant; option go_package="tenant"; import "gogoproto/gogo.proto"; - -// WatchPodsRequest is empty as we want to get all notifications. -message WatchPodsRequest {} +import "google/protobuf/timestamp.proto"; // PodState gives the current state of a tenant pod, so that the proxy knows // how/where to route traffic. @@ -21,13 +19,13 @@ message WatchPodsRequest {} enum PodState { option (gogoproto.goproto_enum_prefix) = false; - // UNKNOWN indicates that the pod values being reported are from a - // potentially out of date source. UNKNOWN may be used to notify updates to - // pod values when the pod's state may be out of date by the time the update - // is processed. + // UNKNOWN indicates that the pod values being reported are from a potentially + // out of date source. UNKNOWN may be used to notify updates to pod values + // when the pod's state may be out of date by the time the update is processed. UNKNOWN = 0; // RUNNING indicates the pod may have active SQL connections and is ready to // accept new SQL connections. + // // NOTE: The proxy must still be prepared to retry connections against a // running pod in case of transient failures. RUNNING = 1; @@ -46,17 +44,38 @@ enum PodState { // location, and state. message Pod { // TenantID is the tenant that owns the pod. - uint64 tenant_id = 2[(gogoproto.customname) = "TenantID"]; - // Addr is the ip and port combo identifying the tenant pod, (e.g. + uint64 tenant_id = 2 [(gogoproto.customname) = "TenantID"]; + // addr is the ip and port combination identifying the tenant pod, (e.g. // 132.130.1.11:34576). - string Addr = 1; - // PodState gives the current status of the tenant pod. - PodState State = 3; + string addr = 1; + // state gives the current status of the tenant pod. + PodState state = 3; // Load is a number in the range [0, 1] indicating the current amount of load // experienced by this tenant pod. float Load = 4; + // StateTimestamp represents the timestamp that the state was last updated. + google.protobuf.Timestamp stateTimestamp = 5 [(gogoproto.nullable) = false, (gogoproto.stdtime) = true]; +} + +// ListPodsRequest is used to query the server for the list of current pods of +// a given tenant. +message ListPodsRequest { + // TenantID identifies the tenant for which the client is requesting a list of + // the pods. + uint64 tenant_id = 1 [(gogoproto.customname) = "TenantID"]; +} + +// ListPodsResponse is sent back as a result of requesting the list of pods for +// a given tenant. +message ListPodsResponse { + // Pods is the list of RUNNING and/or DRAINING pods for the requested tenant. + // It does not include DELETING pods. + repeated Pod pods = 1; } +// WatchPodsRequest is empty as we want to get all notifications. +message WatchPodsRequest {} + // WatchPodsResponse represents the notifications that the server sends to // its clients when clients want to monitor the directory server activity. message WatchPodsResponse { @@ -64,19 +83,11 @@ message WatchPodsResponse { Pod pod = 1; } -// ListPodsRequest is used to query the server for the list of current -// pods of a given tenant. -message ListPodsRequest { - // TenantID identifies the tenant for which the client is requesting a list of - // the pods. - uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; -} - // EnsurePodRequest is used to ensure that at least one tenant pod is in the // RUNNING state. message EnsurePodRequest { // TenantID is the id of the tenant for which a RUNNING pod is requested. - uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; + uint64 tenant_id = 1 [(gogoproto.customname) = "TenantID"]; } // EnsurePodResponse is empty and indicates that the server processed the @@ -84,19 +95,11 @@ message EnsurePodRequest { message EnsurePodResponse { } -// ListPodsResponse is sent back as a result of requesting the list of pods for -// a given tenant. -message ListPodsResponse { - // Pods is the list of RUNNING and/or DRAINING pods for the requested tenant. - // It does not include DELETING pods. - repeated Pod pods = 1; -} - // GetTenantRequest is used by a client to request from the sever metadata // related to a given tenant. message GetTenantRequest { // TenantID identifies the tenant for which the metadata is being requested. - uint64 tenant_id = 1[(gogoproto.customname) = "TenantID"]; + uint64 tenant_id = 1 [(gogoproto.customname) = "TenantID"]; } // GetTenantResponse is sent back when a client requests metadata for a tenant. diff --git a/pkg/ccl/sqlproxyccl/tenant/directory_cache.go b/pkg/ccl/sqlproxyccl/tenant/directory_cache.go index f01353a0c08e..8896639f910e 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory_cache.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_cache.go @@ -154,10 +154,11 @@ func NewDirectoryCache( return dir, nil } -// LookupTenantPods returns a list of SQL pods in the RUNNING state for the -// given tenant. If the tenant was just created or is suspended, such that there -// are no available processes, then LookupTenantPods will trigger resumption of a -// new instance and block until the process is ready. +// LookupTenantPods returns a list of SQL pods in the RUNNING and DRAINING +// states for the given tenant. If the tenant was just created or is suspended, +// such that there are no available RUNNING processes, then LookupTenantPods +// will trigger resumption of a new instance (or a conversion of a DRAINING pod +// to a RUNNING one) and block until that happens. // // If clusterName is non-empty, then a GRPC NotFound error is returned if no // pods match the cluster name. This can be used to ensure that the incoming SQL @@ -191,7 +192,15 @@ func (d *directoryCache) LookupTenantPods( ctx, _ = d.stopper.WithCancelOnQuiesce(ctx) tenantPods := entry.GetPods() - if len(tenantPods) == 0 { + + // Trigger resumption if there are no RUNNING pods. + runningPods := make([]*Pod, 0, len(tenantPods)) + for _, pod := range tenantPods { + if pod.State == RUNNING { + runningPods = append(runningPods, pod) + } + } + if len(runningPods) == 0 { // There are no known pod IP addresses, so fetch pod information from // the directory server. Resume the tenant if it is suspended; that // will always result in at least one pod IP address (or an error). @@ -206,12 +215,12 @@ func (d *directoryCache) LookupTenantPods( return tenantPods, nil } -// TryLookupTenantPods returns a list of SQL pods in the RUNNING state for the -// given tenant. It returns a GRPC NotFound error if the tenant does not exist -// (e.g. it has not yet been created) or if it has not yet been fetched into the -// directory's cache (TryLookupTenantPods will never attempt to fetch it). If no -// processes are available for the tenant, TryLookupTenantPods will return the -// empty set (unlike LookupTenantPod). +// TryLookupTenantPods returns a list of SQL pods in the RUNNING and DRAINING +// states for thegiven tenant. It returns a GRPC NotFound error if the tenant +// does not exist (e.g. it has not yet been created) or if it has not yet been +// fetched into the directory's cache (TryLookupTenantPods will never attempt to +// fetch it). If no processes are available for the tenant, TryLookupTenantPods +// will return the empty set (unlike LookupTenantPod). // // WARNING: Callers should never attempt to modify values returned by this // method, or else they may be a race. Other instances may be reading from the @@ -444,8 +453,8 @@ func (d *directoryCache) updateTenantEntry(ctx context.Context, pod *Pod) { } switch pod.State { - case RUNNING: - // Add entries of RUNNING pods if they are not already present. + case RUNNING, DRAINING: + // Add entries of RUNNING and DRAINING pods if they are not already present. if entry.AddPod(pod) { log.Infof(ctx, "added IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID) } else { @@ -457,7 +466,7 @@ func (d *directoryCache) updateTenantEntry(ctx context.Context, pod *Pod) { log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID) } default: - // Remove addresses of DRAINING and DELETING pods. + // Remove addresses of DELETING pods. if entry.RemovePodByAddr(pod.Addr) { log.Infof(ctx, "deleted IP address %s for tenant %d", pod.Addr, pod.TenantID) } diff --git a/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go b/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go index c1618c5d8320..380df7ca2522 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_cache_test.go @@ -92,6 +92,7 @@ func TestWatchPods(t *testing.T) { require.Equal(t, tenantID.ToUint64(), pod.TenantID) require.Equal(t, addr, pod.Addr) require.Equal(t, tenant.RUNNING, pod.State) + require.False(t, pod.StateTimestamp.IsZero()) // Trigger drain of pod. tds.Drain() @@ -99,13 +100,7 @@ func TestWatchPods(t *testing.T) { require.Equal(t, tenantID.ToUint64(), pod.TenantID) require.Equal(t, addr, pod.Addr) require.Equal(t, tenant.DRAINING, pod.State) - - // Ensure that all addresses have been cleared from the directory, since - // it should only return RUNNING addresses. - require.Eventually(t, func() bool { - tenantPods, _ := dir.TryLookupTenantPods(ctx, tenantID) - return len(tenantPods) == 0 - }, 10*time.Second, 100*time.Millisecond) + require.False(t, pod.StateTimestamp.IsZero()) // Now shut the tenant directory down. processes := tds.Get(tenantID) @@ -121,13 +116,14 @@ func TestWatchPods(t *testing.T) { require.Equal(t, tenantID.ToUint64(), pod.TenantID) require.Equal(t, addr, pod.Addr) require.Equal(t, tenant.DELETING, pod.State) + require.False(t, pod.StateTimestamp.IsZero()) - // We know that the directory should have been emptied earlier since we - // don't add DRAINING pods to the directory, so putting the pod into the - // DELETING state should not make a difference. - pods, err = dir.TryLookupTenantPods(ctx, tenantID) - require.NoError(t, err) - require.Empty(t, pods) + // Ensure that all addresses have been cleared from the directory, since + // it should only return RUNNING or DRAINING addresses. + require.Eventually(t, func() bool { + tenantPods, _ := dir.TryLookupTenantPods(ctx, tenantID) + return len(tenantPods) == 0 + }, 10*time.Second, 100*time.Millisecond) // Resume tenant again by a direct call to the directory server _, err = tds.EnsurePod(ctx, &tenant.EnsurePodRequest{tenantID.ToUint64()}) @@ -148,6 +144,7 @@ func TestWatchPods(t *testing.T) { require.Equal(t, tenantID.ToUint64(), pod.TenantID) require.Equal(t, addr, pod.Addr) require.Equal(t, tenant.RUNNING, pod.State) + require.False(t, pod.StateTimestamp.IsZero()) // Verify that LookupTenantPods returns the pod's IP address. pods, err = dir.LookupTenantPods(ctx, tenantID, "") @@ -177,6 +174,7 @@ func TestWatchPods(t *testing.T) { require.Equal(t, tenantID.ToUint64(), pod.TenantID) require.Equal(t, addr, pod.Addr) require.Equal(t, tenant.DELETING, pod.State) + require.False(t, pod.StateTimestamp.IsZero()) // Verify that a new call to LookupTenantPods will resume again the tenant. pods, err = dir.LookupTenantPods(ctx, tenantID, "") @@ -189,6 +187,7 @@ func TestWatchPods(t *testing.T) { require.Equal(t, tenantID.ToUint64(), pod.TenantID) require.Equal(t, addr, pod.Addr) require.Equal(t, tenant.RUNNING, pod.State) + require.False(t, pod.StateTimestamp.IsZero()) } func TestCancelLookups(t *testing.T) { @@ -352,6 +351,10 @@ func TestRefreshThrottling(t *testing.T) { require.NoError(t, dir.ReportFailure(ctx, tenantID, addr)) pods, err = dir.TryLookupTenantPods(ctx, tenantID) require.NoError(t, err) + require.NotEmpty(t, pods) + + // Reset StateTimestamp for deterministic comparison. + pods[0].StateTimestamp = time.Time{} require.Equal(t, []*tenant.Pod{{ TenantID: tenantID.ToUint64(), Addr: addr, @@ -364,6 +367,10 @@ func TestRefreshThrottling(t *testing.T) { require.NoError(t, dir.ReportFailure(ctx, tenantID, addr)) pods, err = dir.TryLookupTenantPods(ctx, tenantID) require.NoError(t, err) + require.NotEmpty(t, pods) + + // Reset StateTimestamp for deterministic comparison. + pods[0].StateTimestamp = time.Time{} require.Equal(t, []*tenant.Pod{{ TenantID: tenantID.ToUint64(), Addr: addr, diff --git a/pkg/ccl/sqlproxyccl/tenant/entry.go b/pkg/ccl/sqlproxyccl/tenant/entry.go index 8e69164f7841..8d4fd6c5aa33 100644 --- a/pkg/ccl/sqlproxyccl/tenant/entry.go +++ b/pkg/ccl/sqlproxyccl/tenant/entry.go @@ -247,26 +247,17 @@ func (e *tenantEntry) fetchPodsLocked( return nil, err } - // Get updated list of RUNNING pod IP addresses and save it to the entry. - tenantPods = make([]*Pod, 0, len(list.Pods)) - for i := range list.Pods { - pod := list.Pods[i] - if pod.State == RUNNING { - tenantPods = append(tenantPods, pod) - } - } - // Need to lock in case another thread is reading the IP addresses (e.g. in // ChoosePodAddr). e.pods.Lock() defer e.pods.Unlock() - e.pods.pods = tenantPods + e.pods.pods = list.Pods - if len(tenantPods) != 0 { - log.Infof(ctx, "fetched IP addresses: %v", tenantPods) + if len(e.pods.pods) != 0 { + log.Infof(ctx, "fetched IP addresses: %v", e.pods.pods) } - return tenantPods, nil + return e.pods.pods, nil } // canRefreshLocked returns true if it's been at least X milliseconds since the diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go index 97893e0f6172..857f8cb4193a 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go @@ -180,10 +180,11 @@ func (s *TestDirectoryServer) SetFakeLoad(id roachpb.TenantID, addr net.Addr, fa defer s.listen.RUnlock() s.notifyEventListenersLocked(&tenant.WatchPodsResponse{ Pod: &tenant.Pod{ - Addr: addr.String(), - TenantID: id.ToUint64(), - Load: fakeLoad, - State: tenant.UNKNOWN, + Addr: addr.String(), + TenantID: id.ToUint64(), + Load: fakeLoad, + State: tenant.UNKNOWN, + StateTimestamp: timeutil.Now(), }, }) } @@ -266,9 +267,10 @@ func (s *TestDirectoryServer) Drain() { defer s.listen.RUnlock() s.notifyEventListenersLocked(&tenant.WatchPodsResponse{ Pod: &tenant.Pod{ - TenantID: tenantID, - Addr: addr.String(), - State: tenant.DRAINING, + TenantID: tenantID, + Addr: addr.String(), + State: tenant.DRAINING, + StateTimestamp: timeutil.Now(), }, }) } @@ -341,10 +343,11 @@ func (s *TestDirectoryServer) listLocked( resp := tenant.ListPodsResponse{} for addr, proc := range processByAddr { resp.Pods = append(resp.Pods, &tenant.Pod{ - TenantID: req.TenantID, - Addr: addr.String(), - State: tenant.RUNNING, - Load: proc.FakeLoad, + TenantID: req.TenantID, + Addr: addr.String(), + State: tenant.RUNNING, + Load: proc.FakeLoad, + StateTimestamp: timeutil.Now(), }) } return &resp, nil @@ -362,10 +365,11 @@ func (s *TestDirectoryServer) registerInstanceLocked(tenantID uint64, process *P defer s.listen.RUnlock() s.notifyEventListenersLocked(&tenant.WatchPodsResponse{ Pod: &tenant.Pod{ - TenantID: tenantID, - Addr: process.SQL.String(), - State: tenant.RUNNING, - Load: process.FakeLoad, + TenantID: tenantID, + Addr: process.SQL.String(), + State: tenant.RUNNING, + Load: process.FakeLoad, + StateTimestamp: timeutil.Now(), }, }) } @@ -385,9 +389,10 @@ func (s *TestDirectoryServer) deregisterInstance(tenantID uint64, sql net.Addr) defer s.listen.RUnlock() s.notifyEventListenersLocked(&tenant.WatchPodsResponse{ Pod: &tenant.Pod{ - TenantID: tenantID, - Addr: sql.String(), - State: tenant.DELETING, + TenantID: tenantID, + Addr: sql.String(), + State: tenant.DELETING, + StateTimestamp: timeutil.Now(), }, }) } diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_simple_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_simple_directory_svr.go index dc178028d07f..6961a84407cf 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_simple_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_simple_directory_svr.go @@ -14,6 +14,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/gogo/status" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -67,9 +68,10 @@ func (d *TestSimpleDirectoryServer) ListPods( return &tenant.ListPodsResponse{ Pods: []*tenant.Pod{ { - TenantID: req.TenantID, - Addr: d.podAddr, - State: tenant.RUNNING, + TenantID: req.TenantID, + Addr: d.podAddr, + State: tenant.RUNNING, + StateTimestamp: timeutil.Now(), }, }, }, nil