diff --git a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel index a54eddaf5812..0033017b83bf 100644 --- a/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel +++ b/pkg/ccl/sqlproxyccl/tenant/BUILD.bazel @@ -25,6 +25,7 @@ go_library( srcs = [ "directory.go", "entry.go", + "pod.go", ":mocks_tenant", # keep ], embed = [":tenant_go_proto"], @@ -34,6 +35,7 @@ go_library( "//pkg/roachpb", "//pkg/util/grpcutil", "//pkg/util/log", + "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", @@ -61,9 +63,10 @@ go_test( srcs = [ "directory_test.go", "main_test.go", + "pod_test.go", ], + embed = [":tenant"], deps = [ - ":tenant", "//pkg/base", "//pkg/ccl", "//pkg/ccl/kvccl/kvtenantccl", diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.go b/pkg/ccl/sqlproxyccl/tenant/directory.go index 3ab72d5c8c27..61a4dd37b127 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.go @@ -185,7 +185,13 @@ func (d *Directory) LookupTenantAddrs( return nil, status.Errorf( codes.NotFound, "tenant %d not in directory cache", tenantID.ToUint64()) } - return entry.getPodAddrs(), nil + + tenantPods := entry.getPods() + addrs := make([]string, len(tenantPods)) + for i, pod := range tenantPods { + addrs[i] = pod.Addr + } + return addrs, nil } // ReportFailure should be called when attempts to connect to a particular SQL @@ -376,8 +382,7 @@ func (d *Directory) watchPods(ctx context.Context, stopper *stop.Stopper) error // watcher events. When a pod is created, destroyed, or modified, it updates the // tenant's entry to reflect that change. func (d *Directory) updateTenantEntry(ctx context.Context, pod *Pod) { - podAddr := pod.Addr - if podAddr == "" { + if pod.Addr == "" { // Nothing needs to be done if there is no IP address specified. return } @@ -393,15 +398,23 @@ func (d *Directory) updateTenantEntry(ctx context.Context, pod *Pod) { return } - if pod.State == RUNNING { - // Add addresses of RUNNING pods if they are not already present. - if entry.AddPodAddr(podAddr) { - log.Infof(ctx, "added IP address %s for tenant %d", podAddr, pod.TenantID) + switch pod.State { + case RUNNING: + // Add entries of RUNNING pods if they are not already present. + if entry.AddPod(pod) { + log.Infof(ctx, "added IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID) + } else { + log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID) + } + // Update entries of UNKNOWN pods only if they are already present. + case UNKNOWN: + if entry.UpdatePod(pod) { + log.Infof(ctx, "updated IP address %s with load %.3f for tenant %d", pod.Addr, pod.Load, pod.TenantID) } - } else { + default: // Remove addresses of DRAINING and DELETING pods. - if entry.RemovePodAddr(podAddr) { - log.Infof(ctx, "deleted IP address %s for tenant %d", podAddr, pod.TenantID) + if entry.RemovePodByAddr(pod.Addr) { + log.Infof(ctx, "deleted IP address %s for tenant %d", pod.Addr, pod.TenantID) } } } diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.pb.go b/pkg/ccl/sqlproxyccl/tenant/directory.pb.go index 0b7f37b313de..441b5342453f 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.pb.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory.pb.go @@ -5,6 +5,7 @@ package tenant import ( context "context" + encoding_binary "encoding/binary" fmt "fmt" _ "github.com/gogo/protobuf/gogoproto" proto "github.com/gogo/protobuf/proto" @@ -47,18 +48,25 @@ const ( // DELETING indicates that the pod is being terminated. This state is only // used by WatchPods. DELETING PodState = 2 + // UNKNOWN indicates that the pod values being reported are from a + // potentially out of date source. UNKNOWN may be used to notify updates to + // pod values when the pod's state may be out of date by the time the update + // is processed. + UNKNOWN PodState = 3 ) var PodState_name = map[int32]string{ 0: "RUNNING", 1: "DRAINING", 2: "DELETING", + 3: "UNKNOWN", } var PodState_value = map[string]int32{ "RUNNING": 0, "DRAINING": 1, "DELETING": 2, + "UNKNOWN": 3, } func (x PodState) String() string { @@ -112,6 +120,9 @@ type Pod struct { Addr string `protobuf:"bytes,1,opt,name=Addr,proto3" json:"Addr,omitempty"` // PodState gives the current status of the tenant pod. State PodState `protobuf:"varint,3,opt,name=State,proto3,enum=cockroach.ccl.sqlproxyccl.tenant.PodState" json:"State,omitempty"` + // Load is a number in the range [0, 1] indicating the current amount of load + // experienced by this tenant pod. + Load float32 `protobuf:"fixed32,4,opt,name=Load,proto3" json:"Load,omitempty"` } func (m *Pod) Reset() { *m = Pod{} } @@ -412,38 +423,40 @@ func init() { } var fileDescriptor_ec8b5028e8f2b222 = []byte{ - // 496 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0x41, 0x6f, 0xd3, 0x3c, - 0x1c, 0xc6, 0xe3, 0xa5, 0xef, 0xde, 0xe4, 0xdf, 0x0a, 0x5a, 0xc3, 0xa1, 0xea, 0x21, 0x84, 0x48, - 0xa0, 0xb2, 0x43, 0x0a, 0x9d, 0x04, 0x9a, 0x04, 0x12, 0x9b, 0x5a, 0x4d, 0x95, 0x4a, 0x55, 0x85, - 0x21, 0x24, 0x2e, 0x53, 0xb0, 0xad, 0xad, 0xa2, 0x8b, 0xd3, 0xd8, 0x45, 0xec, 0x0b, 0x20, 0x8e, - 0x7c, 0x07, 0xbe, 0xcc, 0x8e, 0x3b, 0xee, 0x84, 0x20, 0xfd, 0x22, 0x28, 0x76, 0x5a, 0x42, 0x27, - 0xb1, 0x94, 0x9b, 0xed, 0xf6, 0xe7, 0xe7, 0x79, 0xf2, 0x7f, 0x12, 0x78, 0x48, 0xc8, 0xb4, 0x23, - 0x66, 0xd3, 0x38, 0xe1, 0x9f, 0xce, 0xb3, 0xb5, 0x64, 0x51, 0x18, 0xc9, 0x0e, 0x9d, 0x24, 0x8c, - 0x48, 0x9e, 0x9c, 0xfb, 0x71, 0xc2, 0x25, 0xc7, 0x2e, 0xe1, 0xe4, 0x43, 0xc2, 0x43, 0x72, 0xea, - 0x13, 0x32, 0xf5, 0x0b, 0x84, 0xaf, 0x89, 0xd6, 0xdd, 0x13, 0x7e, 0xc2, 0xd5, 0x9f, 0x3b, 0xd9, - 0x4a, 0x73, 0x1e, 0x86, 0xfa, 0xdb, 0x50, 0x92, 0xd3, 0x31, 0xa7, 0x22, 0x60, 0xb3, 0x39, 0x13, - 0xd2, 0xfb, 0x8c, 0xc0, 0x1c, 0x73, 0x8a, 0x1f, 0x81, 0xad, 0xd9, 0xe3, 0x09, 0x6d, 0x6e, 0xb9, - 0xa8, 0x5d, 0x39, 0xa8, 0xa5, 0xdf, 0xef, 0x59, 0x47, 0xea, 0x70, 0xd0, 0x0b, 0x2c, 0xfd, 0xf3, - 0x80, 0x62, 0x0c, 0x95, 0x7d, 0x4a, 0x93, 0x26, 0x72, 0x51, 0xdb, 0x0e, 0xd4, 0x1a, 0xbf, 0x84, - 0xff, 0x5e, 0xcb, 0x50, 0xb2, 0xa6, 0xe9, 0xa2, 0xf6, 0xad, 0xee, 0x8e, 0x7f, 0x93, 0x45, 0x7f, - 0xcc, 0xa9, 0x22, 0x02, 0x0d, 0x7a, 0x43, 0x68, 0x14, 0xcc, 0x89, 0x98, 0x47, 0x82, 0xe1, 0x67, - 0x60, 0xc6, 0x9c, 0x2a, 0xa5, 0x6a, 0xf7, 0x41, 0xa9, 0x4b, 0x83, 0x8c, 0xf0, 0x9e, 0xc3, 0xed, - 0xe1, 0x44, 0xc8, 0x42, 0xd2, 0x3f, 0x13, 0xa2, 0xbf, 0x25, 0xf4, 0x5e, 0x40, 0xbd, 0x1f, 0x89, - 0x79, 0xc2, 0xb2, 0xfb, 0x36, 0xc7, 0xef, 0x40, 0xa3, 0x80, 0xeb, 0x28, 0xde, 0x2b, 0xa8, 0xff, - 0x76, 0x94, 0xc7, 0xdb, 0x83, 0x4a, 0xcc, 0xa9, 0x68, 0x22, 0xd7, 0x2c, 0x9f, 0x4f, 0x21, 0x99, - 0xc5, 0x43, 0x26, 0xb5, 0xf8, 0x3f, 0x58, 0x7c, 0x0a, 0x8d, 0x02, 0x9e, 0xdb, 0xb9, 0x0f, 0x35, - 0x32, 0x9d, 0x0b, 0xc9, 0x92, 0xe3, 0x28, 0x3c, 0x63, 0xf9, 0x80, 0xab, 0xf9, 0xd9, 0x28, 0x3c, - 0x63, 0x3b, 0x7b, 0x60, 0x2d, 0x07, 0x87, 0xab, 0xf0, 0x7f, 0xf0, 0x66, 0x34, 0x1a, 0x8c, 0x0e, - 0xeb, 0x06, 0xae, 0x81, 0xd5, 0x0b, 0xf6, 0x07, 0x6a, 0x87, 0xd4, 0xae, 0x3f, 0xec, 0x1f, 0x65, - 0xbb, 0xad, 0x56, 0xe5, 0xcb, 0x37, 0xc7, 0xe8, 0xa6, 0x26, 0xd8, 0xbd, 0x65, 0x93, 0xf1, 0x0c, - 0xac, 0xe5, 0xe3, 0xc0, 0x4f, 0x6e, 0x0e, 0xbe, 0x36, 0xcc, 0x56, 0x77, 0x13, 0x24, 0x8f, 0xf7, - 0x11, 0xec, 0x55, 0xc3, 0x70, 0x89, 0x0b, 0xd6, 0xdf, 0x95, 0xd6, 0xee, 0x46, 0x8c, 0x56, 0x7d, - 0x8c, 0xb0, 0x04, 0x7b, 0x55, 0x87, 0x32, 0xba, 0xeb, 0xd5, 0x2b, 0xa3, 0x7b, 0xad, 0x6f, 0x99, - 0xea, 0x6a, 0xc2, 0x65, 0x54, 0xd7, 0xdb, 0x54, 0x46, 0xf5, 0x5a, 0x85, 0x0e, 0xda, 0x17, 0x3f, - 0x1d, 0xe3, 0x22, 0x75, 0xd0, 0x65, 0xea, 0xa0, 0xab, 0xd4, 0x41, 0x3f, 0x52, 0x07, 0x7d, 0x5d, - 0x38, 0xc6, 0xe5, 0xc2, 0x31, 0xae, 0x16, 0x8e, 0xf1, 0x6e, 0x5b, 0xb3, 0xef, 0xb7, 0xd5, 0x37, - 0x69, 0xf7, 0x57, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc2, 0x26, 0xd9, 0x8e, 0xf5, 0x04, 0x00, 0x00, + // 517 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x94, 0x41, 0x6f, 0xd3, 0x30, + 0x14, 0xc7, 0xe3, 0xa6, 0x8c, 0xf6, 0xb5, 0x82, 0xd4, 0x70, 0x88, 0x7a, 0x08, 0x21, 0x12, 0x28, + 0xec, 0x90, 0x42, 0x27, 0x81, 0x90, 0x40, 0x62, 0x53, 0xcb, 0x54, 0x51, 0x42, 0x15, 0x36, 0x4d, + 0xe2, 0x32, 0x05, 0xdb, 0xda, 0x2a, 0xba, 0x38, 0x4d, 0x5c, 0xc4, 0xbe, 0x01, 0x47, 0xce, 0x5c, + 0xf9, 0x32, 0x3b, 0xee, 0xb8, 0x13, 0x82, 0xf4, 0x8b, 0xa0, 0x38, 0x69, 0x09, 0x9d, 0xc4, 0xd2, + 0xdd, 0x9e, 0x5d, 0xff, 0xfc, 0x7f, 0x7f, 0xbf, 0x7f, 0x03, 0x0f, 0x09, 0x99, 0x74, 0xe2, 0xe9, + 0x24, 0x8c, 0xf8, 0x97, 0xd3, 0xb4, 0x16, 0x2c, 0xf0, 0x03, 0xd1, 0xa1, 0xe3, 0x88, 0x11, 0xc1, + 0xa3, 0x53, 0x27, 0x8c, 0xb8, 0xe0, 0xd8, 0x24, 0x9c, 0x7c, 0x8a, 0xb8, 0x4f, 0x8e, 0x1d, 0x42, + 0x26, 0x4e, 0x81, 0x70, 0x32, 0xa2, 0x7d, 0xf7, 0x88, 0x1f, 0x71, 0x79, 0xb8, 0x93, 0x56, 0x19, + 0x67, 0x61, 0xd0, 0x0e, 0x7c, 0x41, 0x8e, 0x47, 0x9c, 0xc6, 0x1e, 0x9b, 0xce, 0x58, 0x2c, 0xac, + 0xef, 0x08, 0xd4, 0x11, 0xa7, 0xf8, 0x11, 0xd4, 0x33, 0xf6, 0x70, 0x4c, 0xf5, 0x8a, 0x89, 0xec, + 0xea, 0x4e, 0x33, 0xf9, 0x79, 0xaf, 0xb6, 0x27, 0x37, 0x07, 0x3d, 0xaf, 0x96, 0xfd, 0x3c, 0xa0, + 0x18, 0x43, 0x75, 0x9b, 0xd2, 0x48, 0x47, 0x26, 0xb2, 0xeb, 0x9e, 0xac, 0xf1, 0x2b, 0xb8, 0xf1, + 0x5e, 0xf8, 0x82, 0xe9, 0xaa, 0x89, 0xec, 0x5b, 0xdd, 0x4d, 0xe7, 0xaa, 0x16, 0x9d, 0x11, 0xa7, + 0x92, 0xf0, 0x32, 0x30, 0xbd, 0x75, 0xc8, 0x7d, 0xaa, 0x57, 0x4d, 0x64, 0x57, 0x3c, 0x59, 0x5b, + 0x43, 0x68, 0x15, 0x1a, 0x8e, 0x43, 0x1e, 0xc4, 0x0c, 0x3f, 0x03, 0x35, 0xe4, 0x54, 0xaa, 0x37, + 0xba, 0x0f, 0x4a, 0x09, 0x79, 0x29, 0x61, 0xbd, 0x80, 0xdb, 0xc3, 0x71, 0x2c, 0x0a, 0xee, 0xff, + 0x75, 0x8d, 0xfe, 0xe7, 0xda, 0x7a, 0x09, 0x5a, 0x3f, 0x88, 0x67, 0x11, 0x4b, 0xef, 0x5b, 0x1f, + 0xbf, 0x03, 0xad, 0x02, 0x9e, 0x59, 0xb1, 0xde, 0x82, 0xf6, 0xb7, 0xa3, 0xdc, 0xde, 0x73, 0xa8, + 0x86, 0x9c, 0xc6, 0x3a, 0x32, 0xd5, 0xf2, 0xfe, 0x24, 0x92, 0xb6, 0xb8, 0xcb, 0x44, 0x26, 0x7e, + 0x8d, 0x16, 0x9f, 0x42, 0xab, 0x80, 0xe7, 0xed, 0xdc, 0x87, 0x26, 0x99, 0xcc, 0x62, 0xc1, 0xa2, + 0xc3, 0xc0, 0x3f, 0x61, 0xf9, 0xd0, 0x1b, 0xf9, 0x9e, 0xeb, 0x9f, 0xb0, 0xcd, 0xd7, 0x50, 0x5b, + 0x0c, 0x13, 0x37, 0xe0, 0xa6, 0xb7, 0xef, 0xba, 0x03, 0x77, 0x57, 0x53, 0x70, 0x13, 0x6a, 0x3d, + 0x6f, 0x7b, 0x20, 0x57, 0x48, 0xae, 0xfa, 0xc3, 0xfe, 0x5e, 0xba, 0xaa, 0xa4, 0x07, 0xf7, 0xdd, + 0x37, 0xee, 0xbb, 0x03, 0x57, 0x53, 0xdb, 0xd5, 0xaf, 0x3f, 0x0c, 0xa5, 0x9b, 0xa8, 0x50, 0xef, + 0x2d, 0xa2, 0x8e, 0xa7, 0x50, 0x5b, 0xbc, 0x0d, 0x7e, 0x72, 0xf5, 0x2b, 0xac, 0x4c, 0xb6, 0xdd, + 0x5d, 0x07, 0xc9, 0xbd, 0x7e, 0x86, 0xfa, 0x32, 0x6e, 0xb8, 0xc4, 0x05, 0xab, 0x7f, 0xa6, 0xf6, + 0xd6, 0x5a, 0x4c, 0xa6, 0xfa, 0x18, 0x61, 0x01, 0xf5, 0x65, 0x36, 0xca, 0xe8, 0xae, 0xe6, 0xb0, + 0x8c, 0xee, 0xa5, 0xf0, 0xa5, 0xaa, 0xcb, 0x71, 0x97, 0x51, 0x5d, 0x8d, 0x56, 0x19, 0xd5, 0x4b, + 0x79, 0xda, 0xb1, 0xcf, 0x7e, 0x1b, 0xca, 0x59, 0x62, 0xa0, 0xf3, 0xc4, 0x40, 0x17, 0x89, 0x81, + 0x7e, 0x25, 0x06, 0xfa, 0x36, 0x37, 0x94, 0xf3, 0xb9, 0xa1, 0x5c, 0xcc, 0x0d, 0xe5, 0xc3, 0x46, + 0xc6, 0x7e, 0xdc, 0x90, 0x1f, 0xad, 0xad, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x21, 0x0f, 0x28, + 0xb4, 0x16, 0x05, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -731,6 +744,12 @@ func (m *Pod) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Load != 0 { + i -= 4 + encoding_binary.LittleEndian.PutUint32(dAtA[i:], uint32(math.Float32bits(float32(m.Load)))) + i-- + dAtA[i] = 0x25 + } if m.State != 0 { i = encodeVarintDirectory(dAtA, i, uint64(m.State)) i-- @@ -996,6 +1015,9 @@ func (m *Pod) Size() (n int) { if m.State != 0 { n += 1 + sovDirectory(uint64(m.State)) } + if m.Load != 0 { + n += 5 + } return n } @@ -1240,6 +1262,17 @@ func (m *Pod) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 5 { + return fmt.Errorf("proto: wrong wireType = %d for field Load", wireType) + } + var v uint32 + if (iNdEx + 4) > l { + return io.ErrUnexpectedEOF + } + v = uint32(encoding_binary.LittleEndian.Uint32(dAtA[iNdEx:])) + iNdEx += 4 + m.Load = float32(math.Float32frombits(v)) default: iNdEx = preIndex skippy, err := skipDirectory(dAtA[iNdEx:]) diff --git a/pkg/ccl/sqlproxyccl/tenant/directory.proto b/pkg/ccl/sqlproxyccl/tenant/directory.proto index 10416ace9028..04acb616b8e7 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory.proto +++ b/pkg/ccl/sqlproxyccl/tenant/directory.proto @@ -35,6 +35,11 @@ enum PodState { // DELETING indicates that the pod is being terminated. This state is only // used by WatchPods. DELETING = 2; + // UNKNOWN indicates that the pod values being reported are from a + // potentially out of date source. UNKNOWN may be used to notify updates to + // pod values when the pod's state may be out of date by the time the update + // is processed. + UNKNOWN = 3; } // Pod contains information about a tenant pod, such as its tenant owner, @@ -47,6 +52,9 @@ message Pod { string Addr = 1; // PodState gives the current status of the tenant pod. PodState State = 3; + // Load is a number in the range [0, 1] indicating the current amount of load + // experienced by this tenant pod. + float Load = 4; } // WatchPodsResponse represents the notifications that the server sends to diff --git a/pkg/ccl/sqlproxyccl/tenant/directory_test.go b/pkg/ccl/sqlproxyccl/tenant/directory_test.go index 00a34e2dd025..f0390e84834e 100644 --- a/pkg/ccl/sqlproxyccl/tenant/directory_test.go +++ b/pkg/ccl/sqlproxyccl/tenant/directory_test.go @@ -350,6 +350,104 @@ func TestRefreshThrottling(t *testing.T) { require.Equal(t, []string{addr}, addrs) } +func TestLoadBalancing(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.ScopeWithoutShowLogs(t).Close(t) + + // Make pod watcher channel. + podWatcher := make(chan *tenant.Pod, 10) + + // Create the directory. + ctx := context.Background() + tc, dir, tds := newTestDirectory(t, tenant.PodWatcher(podWatcher)) + defer tc.Stopper().Stop(ctx) + + tenantID := roachpb.MakeTenantID(30) + require.NoError(t, createTenant(tc, tenantID)) + + // Forcibly start two instances of tenantID. + require.NoError(t, tds.StartTenant(ctx, tenantID)) + <-podWatcher + + require.NoError(t, tds.StartTenant(ctx, tenantID)) + <-podWatcher + + // Ensure that instance 2 has started. + processes := tds.Get(tenantID) + require.NotNil(t, processes) + require.Len(t, processes, 2) + + // Wait for the background watcher to populate both pods. + require.Eventually(t, func() bool { + addrs, _ := dir.LookupTenantAddrs(ctx, tenantID) + return len(addrs) == 2 + }, 10*time.Second, 100*time.Millisecond) + + var processAddrs []net.Addr + for k := range processes { + processAddrs = append(processAddrs, k) + } + + // Both tenants will have the same initial (and fake) load reporting. + // Observe that EnsureTenantAddr evenly distributes load across them. + responses := map[string]int{} + for i := 0; i < 100; i++ { + addr, err := dir.EnsureTenantAddr(ctx, tenantID, "") + require.NoError(t, err) + responses[addr] += 1 + } + + // Assert that the distribution is roughly 50/50. + require.InDelta(t, responses[processAddrs[0].String()], 50, 15) + require.InDelta(t, responses[processAddrs[1].String()], 50, 15) + + // Adjust load such that the distribution will be a 25/75 split. + tds.SetFakeLoad(tenantID, processes[processAddrs[0]].SQL, 0.25) + pod := <-podWatcher + require.Equal(t, float32(0.25), pod.Load) + require.Equal(t, processes[processAddrs[0]].SQL.String(), pod.Addr) + + tds.SetFakeLoad(tenantID, processes[processAddrs[1]].SQL, 0.75) + pod = <-podWatcher + require.Equal(t, float32(0.75), pod.Load) + require.Equal(t, processes[processAddrs[1]].SQL.String(), pod.Addr) + + // There's no way to syncronize on the directory updating it's internal + // state. It requires 2 loop iterations, so 250ms should be more than + // enough. + time.Sleep(250 * time.Millisecond) + + responses = map[string]int{} + for i := 0; i < 100; i++ { + addr, err := dir.EnsureTenantAddr(ctx, tenantID, "") + require.NoError(t, err) + responses[addr] += 1 + } + + // Observe that the distribution is now a 3/1 split. + require.InDelta(t, responses[processAddrs[0].String()], 75, 15) + require.InDelta(t, responses[processAddrs[1].String()], 25, 15) + + // Stop our the running tenants. + for _, process := range processes { + process.Stopper.Stop(ctx) + } + + // Wait for the tenantdir to reflect that no tenants are running any more. + require.Eventually(t, func() bool { + addrs, _ := dir.LookupTenantAddrs(ctx, tenantID) + return len(addrs) == 0 + }, 10*time.Second, 100*time.Millisecond) + + // Set the load on a deleted tenant. + tds.SetFakeLoad(tenantID, processes[processAddrs[0]].SQL, 0.8) + _ = <-podWatcher + + // And ensure that the deleted entry does not get resurrected. + addrs, _ := dir.LookupTenantAddrs(ctx, tenantID) + require.Len(t, addrs, 0) +} + func createTenant(tc serverutils.TestClusterInterface, id roachpb.TenantID) error { srv := tc.Server(0) conn := srv.InternalExecutor().(*sql.InternalExecutor) diff --git a/pkg/ccl/sqlproxyccl/tenant/entry.go b/pkg/ccl/sqlproxyccl/tenant/entry.go index 58b6f19a4f97..02be6ce0060c 100644 --- a/pkg/ccl/sqlproxyccl/tenant/entry.go +++ b/pkg/ccl/sqlproxyccl/tenant/entry.go @@ -11,11 +11,13 @@ package tenant import ( "context" "fmt" + "math/rand" "sync" "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" ) @@ -52,10 +54,8 @@ type tenantEntry struct { // accessing them. pods struct { syncutil.Mutex - - // addrs is the set of IP:port addresses of the tenant's currently - // RUNNING pods. - addrs []string + rng *rand.Rand + pods []*Pod } // calls synchronizes calls to the Directory service for this tenant (e.g. @@ -84,6 +84,7 @@ func (e *tenantEntry) Initialize(ctx context.Context, client DirectoryClient) er } e.ClusterName = tenantResp.ClusterName + e.pods.rng, _ = randutil.NewPseudoRand() }) // If Initialize has already been called, return any error that occurred. @@ -112,69 +113,104 @@ func (e *tenantEntry) RefreshPods(ctx context.Context, client DirectoryClient) e // ChoosePodAddr returns the IP address of one of this tenant's available pods. // If a tenant has multiple pods, then ChoosePodAddr returns the IP address of -// one of those pods. If the tenant is suspended and no pods are available, then -// ChoosePodAddr will trigger resumption of the tenant and return the IP address -// of the new pod. Note that resuming a tenant requires directory server calls, -// so ChoosePodAddr can block for some time, until the resumption process is -// complete. However, if errorIfNoPods is true, then ChoosePodAddr returns an -// error if there are no pods available rather than blocking. -// -// TODO(andyk): Use better load-balancing algorithm once tenants can have more -// than one pod. +// one of those pods based on the pods' reported load. If the tenant is +// suspended and no pods are available, then ChoosePodAddr will trigger +// resumption of the tenant and return the IP address of the new pod. Note that +// resuming a tenant requires directory server calls, so ChoosePodAddr can +// block for some time, until the resumption process is complete. However, if +// errorIfNoPods is true, then ChoosePodAddr returns an error if there are no +// pods available rather than blocking. func (e *tenantEntry) ChoosePodAddr( ctx context.Context, client DirectoryClient, errorIfNoPods bool, ) (string, error) { - addrs := e.getPodAddrs() - if len(addrs) == 0 { + pods := e.getPods() + if len(pods) == 0 { // There are no known pod IP addresses, so fetch pod information // from the directory server. Resume the tenant if it is suspended; that // will always result in at least one pod IP address (or an error). var err error - if addrs, err = e.ensureTenantPod(ctx, client, errorIfNoPods); err != nil { + if pods, err = e.ensureTenantPod(ctx, client, errorIfNoPods); err != nil { return "", err } } - return addrs[0], nil + + return selectTenantPod(e.randFloat32(), pods).Addr, nil } -// AddPodAddr inserts the given IP address into the tenant's list of pod IPs. If -// it is already present, then AddPodAddr returns false. -func (e *tenantEntry) AddPodAddr(addr string) bool { +// randFloat32 generates a random float32 within the bounds [0, 1) and is +// thread safe. +func (e *tenantEntry) randFloat32() float32 { e.pods.Lock() defer e.pods.Unlock() + return e.pods.rng.Float32() +} - for _, existing := range e.pods.addrs { - if existing == addr { +// AddPod inserts the given pod into the tenant's list of pods. If it is +// already present, then AddPod updates the pod entry and returns false. +func (e *tenantEntry) AddPod(pod *Pod) bool { + e.pods.Lock() + defer e.pods.Unlock() + + for i, existing := range e.pods.pods { + if existing.Addr == pod.Addr { + // e.pods.pods is copy on write. Whenever modifications are made, + // we must make a copy to avoid accidentally mutating the slice + // retrieved by getPods. + pods := e.pods.pods + e.pods.pods = make([]*Pod, len(pods)) + copy(e.pods.pods, pods) + e.pods.pods[i] = pod return false } } - e.pods.addrs = append(e.pods.addrs, addr) + e.pods.pods = append(e.pods.pods, pod) return true } -// RemovePodAddr removes the given IP address from the tenant's list of pod -// addresses. If it was not present, RemovePodAddr returns false. -func (e *tenantEntry) RemovePodAddr(addr string) bool { +// UpdatePod updates the given pod in the tenant's list of pods. If an entry +// with a match Addr is not present, UpdatePod returns false. +func (e *tenantEntry) UpdatePod(pod *Pod) bool { + e.pods.Lock() + defer e.pods.Unlock() + + for i, existing := range e.pods.pods { + if existing.Addr == pod.Addr { + // e.pods.pods is copy on write. Whenever modifications are made, + // we must make a copy to avoid accidentally mutating the slice + // retrieved by getPods. + pods := e.pods.pods + e.pods.pods = make([]*Pod, len(pods)) + copy(e.pods.pods, pods) + e.pods.pods[i] = pod + return true + } + } + + return false +} + +// RemovePodByAddr removes the pod with the given IP address from the tenant's +// list of pod addresses. If it was not present, RemovePodAddr returns false. +func (e *tenantEntry) RemovePodByAddr(addr string) bool { e.pods.Lock() defer e.pods.Unlock() - for i, existing := range e.pods.addrs { - if existing == addr { - copy(e.pods.addrs[i:], e.pods.addrs[i+1:]) - e.pods.addrs = e.pods.addrs[:len(e.pods.addrs)-1] + for i, existing := range e.pods.pods { + if existing.Addr == addr { + copy(e.pods.pods[i:], e.pods.pods[i+1:]) + e.pods.pods = e.pods.pods[:len(e.pods.pods)-1] return true } } return false } -// getPodAddrs gets the current list of pod IP addresses within scope of lock -// and returns them. -func (e *tenantEntry) getPodAddrs() []string { +// getPod gets the current list of pods within scope of lock and returns them. +func (e *tenantEntry) getPods() []*Pod { e.pods.Lock() defer e.pods.Unlock() - return e.pods.addrs + return e.pods.pods } // ensureTenantPod ensures that at least one SQL process exists for this tenant, @@ -183,7 +219,7 @@ func (e *tenantEntry) getPodAddrs() []string { // rather than blocking. func (e *tenantEntry) ensureTenantPod( ctx context.Context, client DirectoryClient, errorIfNoPods bool, -) (addrs []string, err error) { +) (pods []*Pod, err error) { const retryDelay = 100 * time.Millisecond e.calls.Lock() @@ -192,9 +228,9 @@ func (e *tenantEntry) ensureTenantPod( // If an IP address is already available, nothing more to do. Check this // immediately after obtaining the lock so that only the first thread does // the work to get information about the tenant. - addrs = e.getPodAddrs() - if len(addrs) != 0 { - return addrs, nil + pods = e.getPods() + if len(pods) != 0 { + return pods, nil } for { @@ -213,11 +249,11 @@ func (e *tenantEntry) ensureTenantPod( // race conditions, this is expected to immediately find an IP address, // since the above call started a tenant process that already has an IP // address. - addrs, err = e.fetchPodsLocked(ctx, client) + pods, err = e.fetchPodsLocked(ctx, client) if err != nil { return nil, err } - if len(addrs) != 0 { + if len(pods) != 0 { log.Infof(ctx, "resumed tenant %d", e.TenantID) break } @@ -230,7 +266,7 @@ func (e *tenantEntry) ensureTenantPod( sleepContext(ctx, retryDelay) } - return addrs, nil + return pods, nil } // fetchPodsLocked makes a synchronous directory server call to get the latest @@ -239,7 +275,7 @@ func (e *tenantEntry) ensureTenantPod( // NOTE: Caller must lock the "calls" mutex before calling fetchPodsLocked. func (e *tenantEntry) fetchPodsLocked( ctx context.Context, client DirectoryClient, -) (addrs []string, err error) { +) (tenantPods []*Pod, err error) { // List the pods for the given tenant. // TODO(andyk): This races with the pod watcher, which may receive updates // that are newer than what ListPods returns. This could be fixed by adding @@ -250,11 +286,11 @@ func (e *tenantEntry) fetchPodsLocked( } // Get updated list of RUNNING pod IP addresses and save it to the entry. - addrs = make([]string, 0, len(list.Pods)) + tenantPods = make([]*Pod, 0, len(list.Pods)) for i := range list.Pods { pod := list.Pods[i] if pod.State == RUNNING { - addrs = append(addrs, pod.Addr) + tenantPods = append(tenantPods, pod) } } @@ -262,13 +298,13 @@ func (e *tenantEntry) fetchPodsLocked( // ChoosePodAddr). e.pods.Lock() defer e.pods.Unlock() - e.pods.addrs = addrs + e.pods.pods = tenantPods - if len(addrs) != 0 { - log.Infof(ctx, "fetched IP addresses: %v", addrs) + if len(tenantPods) != 0 { + log.Infof(ctx, "fetched IP addresses: %v", tenantPods) } - return addrs, nil + return tenantPods, nil } // canRefreshLocked returns true if it's been at least X milliseconds since the diff --git a/pkg/ccl/sqlproxyccl/tenant/pod.go b/pkg/ccl/sqlproxyccl/tenant/pod.go new file mode 100644 index 000000000000..80f638b3386d --- /dev/null +++ b/pkg/ccl/sqlproxyccl/tenant/pod.go @@ -0,0 +1,43 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package tenant + +// selectTenantPod selects a tenant pod from the given list to received +// incoming traffic. Pods are weighted by their reported CPU load. rand must be +// a pseudo random number within the bounds [0, 1). It is suggested to use +// Float32() of a PseudoRand instance that is guarded by a mutex. +// +// rngMu.Lock() +// rand := rng.Float32() +// rngMu.Unlock() +// selectTenantPod(rand, pods) +func selectTenantPod(rand float32, pods []*Pod) *Pod { + if len(pods) == 1 { + return pods[0] + } + + totalLoad := float32(0) + for _, pod := range pods { + totalLoad += 1 - pod.Load + } + + totalLoad *= rand + + for _, pod := range pods { + totalLoad -= 1 - pod.Load + if totalLoad < 0 { + return pod + } + } + + // This is unreachable provided that Load is [0, 1] and rand is [0, 1). We + // fallback to the final pod in the list to prevent complications if we've + // received malformed .Loads. + return pods[len(pods)-1] +} diff --git a/pkg/ccl/sqlproxyccl/tenant/pod_test.go b/pkg/ccl/sqlproxyccl/tenant/pod_test.go new file mode 100644 index 000000000000..929290c33ce9 --- /dev/null +++ b/pkg/ccl/sqlproxyccl/tenant/pod_test.go @@ -0,0 +1,42 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package tenant + +import ( + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/stretchr/testify/require" +) + +func TestSelectTenantPods(t *testing.T) { + defer leaktest.AfterTest(t)() + + pods := []*Pod{ + {Addr: "1", Load: 0.0}, + {Addr: "2", Load: 0.5}, + {Addr: "3", Load: 0.9}, + } + + distribution := map[string]int{} + rng := rand.New(rand.NewSource(0)) + + for i := 0; i < 10000; i++ { + pod := selectTenantPod(rng.Float32(), pods) + distribution[pod.Addr]++ + } + + // Assert that the distribution is a roughly function of 1 - Load. + require.Equal(t, map[string]int{ + "1": 6121, + "2": 3214, + "3": 665, + }, distribution) +} diff --git a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go index e21da476c7e1..0773dd42498e 100644 --- a/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go +++ b/pkg/ccl/sqlproxyccl/tenantdirsvr/test_directory_svr.go @@ -33,9 +33,10 @@ var _ tenant.DirectoryServer = (*TestDirectoryServer)(nil) // Process stores information about a running tenant process. type Process struct { - Stopper *stop.Stopper - Cmd *exec.Cmd - SQL net.Addr + Stopper *stop.Stopper + Cmd *exec.Cmd + SQL net.Addr + FakeLoad float32 } // NewSubStopper creates a new stopper that will be stopped when either the @@ -123,6 +124,61 @@ func (s *TestDirectoryServer) Get(id roachpb.TenantID) (result map[net.Addr]*Pro return } +// StartTenant will forcefully start a new tenant pod +// instance. This may be useful to test the behavior when more +// than one tenant is running. +func (s *TestDirectoryServer) StartTenant(ctx context.Context, id roachpb.TenantID) error { + select { + case <-s.stopper.ShouldQuiesce(): + return context.Canceled + default: + } + + ctx = logtags.AddTag(ctx, "tenant", id) + + s.proc.Lock() + defer s.proc.Unlock() + + process, err := s.TenantStarterFunc(ctx, id.ToUint64()) + if err != nil { + return err + } + + s.registerInstanceLocked(id.ToUint64(), process) + process.Stopper.AddCloser(stop.CloserFn(func() { + s.deregisterInstance(id.ToUint64(), process.SQL) + })) + + return nil +} + +// SetFakeLoad artificially sets the load reported by a +// specific tenant pod. +func (s *TestDirectoryServer) SetFakeLoad(id roachpb.TenantID, addr net.Addr, fakeLoad float32) { + s.proc.RLock() + defer s.proc.RUnlock() + processes, ok := s.proc.processByAddrByTenantID[id.ToUint64()] + if !ok { + return + } + process, ok := processes[addr] + if !ok { + return + } + process.FakeLoad = fakeLoad + + s.listen.RLock() + defer s.listen.RUnlock() + s.notifyEventListenersLocked(&tenant.WatchPodsResponse{ + Pod: &tenant.Pod{ + Addr: process.SQL.String(), + TenantID: id.ToUint64(), + Load: process.FakeLoad, + State: tenant.UNKNOWN, + }, + }) +} + // GetTenant returns tenant metadata for a given ID. Hard coded to return every // tenant's cluster name as "tenant-cluster" func (s *TestDirectoryServer) GetTenant( @@ -274,8 +330,13 @@ func (s *TestDirectoryServer) listLocked( return &tenant.ListPodsResponse{}, nil } resp := tenant.ListPodsResponse{} - for addr := range processByAddr { - resp.Pods = append(resp.Pods, &tenant.Pod{Addr: addr.String()}) + for addr, proc := range processByAddr { + resp.Pods = append(resp.Pods, &tenant.Pod{ + TenantID: req.TenantID, + Addr: addr.String(), + State: tenant.RUNNING, + Load: proc.FakeLoad, + }) } return &resp, nil } @@ -295,6 +356,7 @@ func (s *TestDirectoryServer) registerInstanceLocked(tenantID uint64, process *P TenantID: tenantID, Addr: process.SQL.String(), State: tenant.RUNNING, + Load: process.FakeLoad, }, }) } @@ -336,7 +398,7 @@ func (s *TestDirectoryServer) startTenantLocked( if err != nil { return nil, err } - process := &Process{SQL: sql.Addr()} + process := &Process{SQL: sql.Addr(), FakeLoad: 0.01} args := []string{ "mt", "start-sql", "--kv-addrs=127.0.0.1:26257", "--idle-exit-after=30s", fmt.Sprintf("--sql-addr=%s", sql.Addr().String()),