diff --git a/internal/internal.go b/internal/internal.go index 83018be7c718..9ce1f18ae9d6 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -77,6 +77,9 @@ var ( // ClearExtraDialOptions clears the array of extra DialOption. This // method is useful in testing and benchmarking. ClearExtraDialOptions func() + // JoinServerOptions combines the server options passed as arguments into a + // single server option. + JoinServerOptions interface{} // func(...grpc.ServerOption) grpc.ServerOption // NewXDSResolverWithConfigForTesting creates a new xds resolver builder using // the provided xds bootstrap config instead of the global configuration from diff --git a/orca/call_metric_recorder.go b/orca/call_metric_recorder.go new file mode 100644 index 000000000000..62f2a1a6c220 --- /dev/null +++ b/orca/call_metric_recorder.go @@ -0,0 +1,130 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca + +import ( + "context" + "sync" + "sync/atomic" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" +) + +// CallMetricRecorder provides functionality to record per-RPC custom backend +// metrics. See CallMetricsServerOption() for more details. +// +// Safe for concurrent use. +type CallMetricRecorder struct { + cpu atomic.Value // float64 + memory atomic.Value // float64 + + mu sync.RWMutex + requestCost map[string]float64 + utilization map[string]float64 +} + +func newCallMetricRecorder() *CallMetricRecorder { + return &CallMetricRecorder{ + requestCost: make(map[string]float64), + utilization: make(map[string]float64), + } +} + +// SetCPUUtilization records a measurement for the CPU utilization metric. +func (c *CallMetricRecorder) SetCPUUtilization(val float64) { + c.cpu.Store(val) +} + +// SetMemoryUtilization records a measurement for the memory utilization metric. +func (c *CallMetricRecorder) SetMemoryUtilization(val float64) { + c.memory.Store(val) +} + +// SetRequestCost records a measurement for a request cost metric, +// uniquely identifiable by name. +func (c *CallMetricRecorder) SetRequestCost(name string, val float64) { + c.mu.Lock() + c.requestCost[name] = val + c.mu.Unlock() +} + +// SetUtilization records a measurement for a utilization metric uniquely +// identifiable by name. +func (c *CallMetricRecorder) SetUtilization(name string, val float64) { + c.mu.Lock() + c.utilization[name] = val + c.mu.Unlock() +} + +// toLoadReportProto dumps the recorded measurements as an OrcaLoadReport proto. +func (c *CallMetricRecorder) toLoadReportProto() *v3orcapb.OrcaLoadReport { + c.mu.RLock() + defer c.mu.RUnlock() + + cost := make(map[string]float64, len(c.requestCost)) + for k, v := range c.requestCost { + cost[k] = v + } + util := make(map[string]float64, len(c.utilization)) + for k, v := range c.utilization { + util[k] = v + } + cpu, _ := c.cpu.Load().(float64) + mem, _ := c.memory.Load().(float64) + return &v3orcapb.OrcaLoadReport{ + CpuUtilization: cpu, + MemUtilization: mem, + RequestCost: cost, + Utilization: util, + } +} + +type callMetricRecorderCtxKey struct{} + +// CallMetricRecorderFromContext returns the RPC specific custom metrics +// recorder [CallMetricRecorder] embedded in the provided RPC context. +// +// Returns nil if no custom metrics recorder is found in the provided context, +// which will be the case when custom metrics reporting is not enabled. +func CallMetricRecorderFromContext(ctx context.Context) *CallMetricRecorder { + rw, ok := ctx.Value(callMetricRecorderCtxKey{}).(*recorderWrapper) + if !ok { + return nil + } + return rw.recorder() +} + +func newContextWithRecorderWrapper(ctx context.Context, r *recorderWrapper) context.Context { + return context.WithValue(ctx, callMetricRecorderCtxKey{}, r) +} + +// recorderWrapper is a wrapper around a CallMetricRecorder to ensures that +// concurrent calls to CallMetricRecorderFromContext() results in only one +// allocation of the underlying metric recorder. +type recorderWrapper struct { + once sync.Once + r *CallMetricRecorder +} + +func (rw *recorderWrapper) recorder() *CallMetricRecorder { + rw.once.Do(func() { + rw.r = newCallMetricRecorder() + }) + return rw.r +} diff --git a/orca/call_metric_recorder_test.go b/orca/call_metric_recorder_test.go new file mode 100644 index 000000000000..f18d7259c249 --- /dev/null +++ b/orca/call_metric_recorder_test.go @@ -0,0 +1,300 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca_test + +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/orca" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +const defaultTestTimeout = 5 * time.Second + +// TestE2ECallMetricsUnary tests the injection of custom backend metrics from +// the server application for a unary RPC, and verifies that expected load +// reports are received at the client. +func (s) TestE2ECallMetricsUnary(t *testing.T) { + tests := []struct { + desc string + injectMetrics bool + wantProto *v3orcapb.OrcaLoadReport + wantErr error + }{ + { + desc: "with custom backend metrics", + injectMetrics: true, + wantProto: &v3orcapb.OrcaLoadReport{ + CpuUtilization: 1.0, + MemUtilization: 50.0, + RequestCost: map[string]float64{"queryCost": 25.0}, + Utilization: map[string]float64{"queueSize": 75.0}, + }, + }, + { + desc: "with no custom backend metrics", + injectMetrics: false, + wantErr: orca.ErrLoadReportMissing, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + // A server option to enables reporting of per-call backend metrics. + callMetricsServerOption := orca.CallMetricsServerOption() + + // An interceptor to injects custom backend metrics, added only when + // the injectMetrics field in the test is set. + injectingInterceptor := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + recorder := orca.CallMetricRecorderFromContext(ctx) + if recorder == nil { + err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") + t.Error(err) + return nil, err + } + recorder.SetCPUUtilization(1.0) + recorder.SetMemoryUtilization(50.0) + // This value will be overwritten by a write to the same metric + // from the server handler. + recorder.SetUtilization("queueSize", 1.0) + return handler(ctx, req) + } + + // A stub server whose unary handler injects custom metrics, if the + // injectMetrics field in the test is set. It overwrites one of the + // values injected above, by the interceptor. + srv := stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + if !test.injectMetrics { + return &testpb.Empty{}, nil + } + recorder := orca.CallMetricRecorderFromContext(ctx) + if recorder == nil { + err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") + t.Error(err) + return nil, err + } + recorder.SetRequestCost("queryCost", 25.0) + recorder.SetUtilization("queueSize", 75.0) + return &testpb.Empty{}, nil + }, + } + + // Start the stub server with the appropriate server options. + sopts := []grpc.ServerOption{callMetricsServerOption} + if test.injectMetrics { + sopts = append(sopts, grpc.ChainUnaryInterceptor(injectingInterceptor)) + } + if err := srv.StartServer(sopts...); err != nil { + t.Fatalf("Failed to start server: %v", err) + } + defer srv.Stop() + + // Dial the stub server. + cc, err := grpc.Dial(srv.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial(%s) failed: %v", srv.Address, err) + } + defer cc.Close() + + // Make a unary RPC and expect the trailer metadata to contain the custom + // backend metrics as an ORCA LoadReport protobuf message. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + client := testgrpc.NewTestServiceClient(cc) + trailer := metadata.MD{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Trailer(&trailer)); err != nil { + t.Fatalf("EmptyCall failed: %v", err) + } + + gotProto, err := orca.ToLoadReport(trailer) + if test.wantErr != nil && !errors.Is(err, test.wantErr) { + t.Fatalf("When retrieving load report, got error: %v, want: %v", err, orca.ErrLoadReportMissing) + } + if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) { + t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto)) + } + }) + } +} + +// TestE2ECallMetricsStreaming tests the injection of custom backend metrics +// from the server application for a streaming RPC, and verifies that expected +// load reports are received at the client. +func (s) TestE2ECallMetricsStreaming(t *testing.T) { + tests := []struct { + desc string + injectMetrics bool + wantProto *v3orcapb.OrcaLoadReport + wantErr error + }{ + { + desc: "with custom backend metrics", + injectMetrics: true, + wantProto: &v3orcapb.OrcaLoadReport{ + CpuUtilization: 1.0, + MemUtilization: 50.0, + RequestCost: map[string]float64{"queryCost": 25.0}, + Utilization: map[string]float64{"queueSize": 75.0}, + }, + }, + { + desc: "with no custom backend metrics", + injectMetrics: false, + wantErr: orca.ErrLoadReportMissing, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + // A server option to enables reporting of per-call backend metrics. + callMetricsServerOption := orca.CallMetricsServerOption() + + // An interceptor which injects custom backend metrics, added only + // when the injectMetrics field in the test is set. + injectingInterceptor := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + recorder := orca.CallMetricRecorderFromContext(ss.Context()) + if recorder == nil { + err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") + t.Error(err) + return err + } + recorder.SetCPUUtilization(1.0) + recorder.SetMemoryUtilization(50.0) + // This value will be overwritten by a write to the same metric + // from the server handler. + recorder.SetUtilization("queueSize", 1.0) + return handler(srv, ss) + } + + // A stub server whose streaming handler injects custom metrics, if + // the injectMetrics field in the test is set. It overwrites one of + // the values injected above, by the interceptor. + srv := stubserver.StubServer{ + FullDuplexCallF: func(stream testgrpc.TestService_FullDuplexCallServer) error { + if test.injectMetrics { + recorder := orca.CallMetricRecorderFromContext(stream.Context()) + if recorder == nil { + err := errors.New("Failed to retrieve per-RPC custom metrics recorder from the RPC context") + t.Error(err) + return err + } + recorder.SetRequestCost("queryCost", 25.0) + recorder.SetUtilization("queueSize", 75.0) + } + + // Streaming implementation replies with a dummy response until the + // client closes the stream (in which case it will see an io.EOF), + // or an error occurs while reading/writing messages. + for { + _, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + payload := &testpb.Payload{Body: make([]byte, 32)} + if err := stream.Send(&testpb.StreamingOutputCallResponse{Payload: payload}); err != nil { + return err + } + } + }, + } + + // Start the stub server with the appropriate server options. + sopts := []grpc.ServerOption{callMetricsServerOption} + if test.injectMetrics { + sopts = append(sopts, grpc.ChainStreamInterceptor(injectingInterceptor)) + } + if err := srv.StartServer(sopts...); err != nil { + t.Fatalf("Failed to start server: %v", err) + } + defer srv.Stop() + + // Dial the stub server. + cc, err := grpc.Dial(srv.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial(%s) failed: %v", srv.Address, err) + } + defer cc.Close() + + // Start the full duplex streaming RPC. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + tc := testgrpc.NewTestServiceClient(cc) + stream, err := tc.FullDuplexCall(ctx) + if err != nil { + t.Fatalf("FullDuplexCall failed: %v", err) + } + + // Send one request to the server. + payload := &testpb.Payload{Type: testpb.PayloadType_RANDOM, Body: make([]byte, 32)} + req := &testpb.StreamingOutputCallRequest{Payload: payload} + if err := stream.Send(req); err != nil { + t.Fatalf("stream.Send() failed: %v", err) + } + // Read one reply from the server. + if _, err := stream.Recv(); err != nil { + t.Fatalf("stream.Recv() failed: %v", err) + } + // Close the sending side. + if err := stream.CloseSend(); err != nil { + t.Fatalf("stream.CloseSend() failed: %v", err) + } + // Make sure it is safe to read the trailer. + for { + if _, err := stream.Recv(); err != nil { + break + } + } + + gotProto, err := orca.ToLoadReport(stream.Trailer()) + if test.wantErr != nil && !errors.Is(err, test.wantErr) { + t.Fatalf("When retrieving load report, got error: %v, want: %v", err, orca.ErrLoadReportMissing) + } + if test.wantProto != nil && !cmp.Equal(gotProto, test.wantProto, cmp.Comparer(proto.Equal)) { + t.Fatalf("Received load report in trailer: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(test.wantProto)) + } + }) + } +} diff --git a/orca/internal/internal.go b/orca/internal/internal.go new file mode 100644 index 000000000000..882fd8287a9b --- /dev/null +++ b/orca/internal/internal.go @@ -0,0 +1,27 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package internal contains orca-internal code, for testing purposes and to +// avoid polluting the godoc of the top-level orca package. +package internal + +// AllowAnyMinReportingInterval prevents clamping of the MinReportingInterval +// configured via ServiceOptions, to a minimum of 30s. +// +// For testing purposes only. +var AllowAnyMinReportingInterval interface{} // func(*ServiceOptions) diff --git a/orca/orca.go b/orca/orca.go new file mode 100644 index 000000000000..414f6ed6ef4f --- /dev/null +++ b/orca/orca.go @@ -0,0 +1,164 @@ +/* + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package orca implements Open Request Cost Aggregation, which is an open +// standard for request cost aggregation and reporting by backends and the +// corresponding aggregation of such reports by L7 load balancers (such as +// Envoy) on the data plane. In a proxyless world with gRPC enabled +// applications, aggregation of such reports will be done by the gRPC client. +// +// Experimental +// +// Notice: All APIs is this package are EXPERIMENTAL and may be changed or +// removed in a later release. +package orca + +import ( + "context" + "errors" + "fmt" + + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/proto" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" +) + +var ( + logger = grpclog.Component("orca-backend-metrics") + joinServerOptions = internal.JoinServerOptions.(func(...grpc.ServerOption) grpc.ServerOption) +) + +const trailerMetadataKey = "endpoint-load-metrics-bin" + +// CallMetricsServerOption returns a server option which enables the reporting +// of per-RPC custom backend metrics for unary and streaming RPCs. +// +// Server applications interested in injecting custom backend metrics should +// pass the server option returned from this function as the first argument to +// grpc.NewServer(). +// +// Subsequently, server RPC handlers can retrieve a reference to the RPC +// specific custom metrics recorder [CallMetricRecorder] to be used, via a call +// to CallMetricRecorderFromContext(), and inject custom metrics at any time +// during the RPC lifecycle. +// +// The injected custom metrics will be sent as part of trailer metadata, as a +// binary-encoded [ORCA LoadReport] protobuf message, with the metadata key +// being set be "endpoint-load-metrics-bin". +// +// [ORCA LoadReport]: https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15 +func CallMetricsServerOption() grpc.ServerOption { + return joinServerOptions(grpc.ChainUnaryInterceptor(unaryInt), grpc.ChainStreamInterceptor(streamInt)) +} + +func unaryInt(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + // We don't allocate the metric recorder here. It will be allocated the + // first time the user calls CallMetricRecorderFromContext(). + rw := &recorderWrapper{} + ctxWithRecorder := newContextWithRecorderWrapper(ctx, rw) + + resp, err := handler(ctxWithRecorder, req) + + // It is safe to access the underlying metric recorder inside the wrapper at + // this point, as the user's RPC handler is done executing, and therefore + // there will be no more calls to CallMetricRecorderFromContext(), which is + // where the metric recorder is lazy allocated. + if rw.r == nil { + return resp, err + } + setTrailerMetadata(ctx, rw.r) + return resp, err +} + +func streamInt(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // We don't allocate the metric recorder here. It will be allocated the + // first time the user calls CallMetricRecorderFromContext(). + rw := &recorderWrapper{} + ws := &wrappedStream{ + ServerStream: ss, + ctx: newContextWithRecorderWrapper(ss.Context(), rw), + } + + err := handler(srv, ws) + + // It is safe to access the underlying metric recorder inside the wrapper at + // this point, as the user's RPC handler is done executing, and therefore + // there will be no more calls to CallMetricRecorderFromContext(), which is + // where the metric recorder is lazy allocated. + if rw.r == nil { + return err + } + setTrailerMetadata(ss.Context(), rw.r) + return err +} + +// setTrailerMetadata adds a trailer metadata entry with key being set to +// `trailerMetadataKey` and value being set to the binary-encoded +// orca.OrcaLoadReport protobuf message. +// +// This function is called from the unary and streaming interceptors defined +// above. Any errors encountered here are not propagated to the caller because +// they are ignored there. Hence we simply log any errors encountered here at +// warning level, and return nothing. +func setTrailerMetadata(ctx context.Context, r *CallMetricRecorder) { + b, err := proto.Marshal(r.toLoadReportProto()) + if err != nil { + logger.Warningf("failed to marshal load report: %v", err) + return + } + if err := grpc.SetTrailer(ctx, metadata.Pairs(trailerMetadataKey, string(b))); err != nil { + logger.Warningf("failed to set trailer metadata: %v", err) + } +} + +// wrappedStream wraps the grpc.ServerStream received by the streaming +// interceptor. Overrides only the Context() method to return a context which +// contains a reference to the CallMetricRecorder corresponding to this stream. +type wrappedStream struct { + grpc.ServerStream + ctx context.Context +} + +func (w *wrappedStream) Context() context.Context { + return w.ctx +} + +// ErrLoadReportMissing indicates no ORCA load report was found in trailers. +var ErrLoadReportMissing = errors.New("orca load report missing in provided metadata") + +// ToLoadReport unmarshals a binary encoded [ORCA LoadReport] protobuf message +// from md and returns the corresponding struct. The load report is expected to +// be stored as the value for key "endpoint-load-metrics-bin". +// +// If no load report was found in the provided metadata, ErrLoadReportMissing is +// returned. +// +// [ORCA LoadReport]: (https://github.com/cncf/xds/blob/main/xds/data/orca/v3/orca_load_report.proto#L15) +func ToLoadReport(md metadata.MD) (*v3orcapb.OrcaLoadReport, error) { + vs := md.Get(trailerMetadataKey) + if len(vs) == 0 { + return nil, ErrLoadReportMissing + } + ret := new(v3orcapb.OrcaLoadReport) + if err := proto.Unmarshal([]byte(vs[0]), ret); err != nil { + return nil, fmt.Errorf("failed to unmarshal load report found in metadata: %v", err) + } + return ret, nil +} diff --git a/orca/orca_test.go b/orca/orca_test.go new file mode 100644 index 000000000000..fd356cfba437 --- /dev/null +++ b/orca/orca_test.go @@ -0,0 +1,86 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca_test + +import ( + "testing" + + "github.com/golang/protobuf/proto" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/orca" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" +) + +func TestToLoadReport(t *testing.T) { + tests := []struct { + name string + md metadata.MD + want *v3orcapb.OrcaLoadReport + wantErr bool + }{ + { + name: "no load report in metadata", + md: metadata.MD{}, + wantErr: true, + }, + { + name: "badly marshaled load report", + md: func() metadata.MD { + return metadata.Pairs("endpoint-load-metrics-bin", string("foo-bar")) + }(), + wantErr: true, + }, + { + name: "good load report", + md: func() metadata.MD { + b, _ := proto.Marshal(&v3orcapb.OrcaLoadReport{ + CpuUtilization: 1.0, + MemUtilization: 50.0, + RequestCost: map[string]float64{"queryCost": 25.0}, + Utilization: map[string]float64{"queueSize": 75.0}, + }) + return metadata.Pairs("endpoint-load-metrics-bin", string(b)) + }(), + want: &v3orcapb.OrcaLoadReport{ + CpuUtilization: 1.0, + MemUtilization: 50.0, + RequestCost: map[string]float64{"queryCost": 25.0}, + Utilization: map[string]float64{"queueSize": 75.0}, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got, err := orca.ToLoadReport(test.md) + if (err != nil) != test.wantErr { + t.Fatalf("orca.ToLoadReport(%v) = %v, wantErr: %v", test.md, err, test.wantErr) + } + if test.wantErr { + return + } + if !cmp.Equal(got, test.want, cmp.Comparer(proto.Equal)) { + t.Fatalf("Extracted load report from metadata: %s, want: %s", pretty.ToJSON(got), pretty.ToJSON(test.want)) + } + }) + } +} diff --git a/orca/service.go b/orca/service.go new file mode 100644 index 000000000000..d36b76f2a9b0 --- /dev/null +++ b/orca/service.go @@ -0,0 +1,194 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca + +import ( + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/orca/internal" + "google.golang.org/grpc/status" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" + v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" + v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" +) + +func init() { + internal.AllowAnyMinReportingInterval = func(so *ServiceOptions) { + so.allowAnyMinReportingInterval = true + } +} + +// minReportingInterval is the absolute minimum value supported for +// out-of-band metrics reporting from the ORCA service implementation +// provided by the orca package. +const minReportingInterval = 30 * time.Second + +// Service provides an implementation of the OpenRcaService as defined in the +// [ORCA] service protos. Instances of this type must be created via calls to +// Register() or NewService(). +// +// Server applications can use the SetXxx() and DeleteXxx() methods to record +// measurements corresponding to backend metrics, which eventually get pushed to +// clients who have initiated the SteamCoreMetrics streaming RPC. +// +// [ORCA]: https://github.com/cncf/xds/blob/main/xds/service/orca/v3/orca.proto +type Service struct { + v3orcaservicegrpc.UnimplementedOpenRcaServiceServer + + // Minimum reporting interval, as configured by the user, or the default. + minReportingInterval time.Duration + + // mu guards the custom metrics injected by the server application. + mu sync.RWMutex + cpu float64 + memory float64 + utilization map[string]float64 +} + +// ServiceOptions contains options to configure the ORCA service implementation. +type ServiceOptions struct { + // MinReportingInterval sets the lower bound for how often out-of-band + // metrics are reported on the streaming RPC initiated by the client. If + // unspecified, negative or less than the default value of 30s, the default + // is used. Clients may request a higher value as part of the + // StreamCoreMetrics streaming RPC. + MinReportingInterval time.Duration + + // Allow a minReportingInterval which is less than the default of 30s. + // Used for testing purposes only. + allowAnyMinReportingInterval bool +} + +// NewService creates a new ORCA service implementation configured using the +// provided options. +func NewService(opts ServiceOptions) (*Service, error) { + // The default minimum supported reporting interval value can be overridden + // for testing purposes through the orca internal package. + if !opts.allowAnyMinReportingInterval { + if opts.MinReportingInterval < 0 || opts.MinReportingInterval < minReportingInterval { + opts.MinReportingInterval = minReportingInterval + } + } + service := &Service{ + minReportingInterval: opts.MinReportingInterval, + utilization: make(map[string]float64), + } + return service, nil +} + +// Register creates a new ORCA service implementation configured using the +// provided options and registers the same on the provided service registrar. +func Register(s *grpc.Server, opts ServiceOptions) (*Service, error) { + service, err := NewService(opts) + if err != nil { + return nil, err + } + v3orcaservicegrpc.RegisterOpenRcaServiceServer(s, service) + return service, nil +} + +// determineReportingInterval determines the reporting interval for out-of-band +// metrics. If the reporting interval is not specified in the request, or is +// negative or is less than the configured minimum (via +// ServiceOptions.MinReportingInterval), the latter is used. Else the value from +// the incoming request is used. +func (s *Service) determineReportingInterval(req *v3orcaservicegrpc.OrcaLoadReportRequest) time.Duration { + if req.GetReportInterval() == nil { + return s.minReportingInterval + } + dur := req.GetReportInterval().AsDuration() + if dur < s.minReportingInterval { + logger.Warningf("Received reporting interval %q is less than configured minimum: %v. Using default: %s", dur, s.minReportingInterval) + return s.minReportingInterval + } + return dur +} + +func (s *Service) sendMetricsResponse(stream v3orcaservicegrpc.OpenRcaService_StreamCoreMetricsServer) error { + return stream.Send(s.toLoadReportProto()) +} + +// StreamCoreMetrics streams custom backend metrics injected by the server +// application. +func (s *Service) StreamCoreMetrics(req *v3orcaservicepb.OrcaLoadReportRequest, stream v3orcaservicepb.OpenRcaService_StreamCoreMetricsServer) error { + ticker := time.NewTicker(s.determineReportingInterval(req)) + defer ticker.Stop() + + for { + if err := s.sendMetricsResponse(stream); err != nil { + return err + } + // Send a response containing the currently recorded metrics + select { + case <-stream.Context().Done(): + return status.Error(codes.Canceled, "Stream has ended.") + case <-ticker.C: + } + } +} + +// SetCPUUtilization records a measurement for the CPU utilization metric. +func (s *Service) SetCPUUtilization(val float64) { + s.mu.Lock() + s.cpu = val + s.mu.Unlock() +} + +// SetMemoryUtilization records a measurement for the memory utilization metric. +func (s *Service) SetMemoryUtilization(val float64) { + s.mu.Lock() + s.memory = val + s.mu.Unlock() +} + +// SetUtilization records a measurement for a utilization metric uniquely +// identifiable by name. +func (s *Service) SetUtilization(name string, val float64) { + s.mu.Lock() + s.utilization[name] = val + s.mu.Unlock() +} + +// DeleteUtilization deletes any previously recorded measurement for a +// utilization metric uniquely identifiable by name. +func (s *Service) DeleteUtilization(name string) { + s.mu.Lock() + delete(s.utilization, name) + s.mu.Unlock() +} + +// toLoadReportProto dumps the recorded measurements as an OrcaLoadReport proto. +func (s *Service) toLoadReportProto() *v3orcapb.OrcaLoadReport { + s.mu.RLock() + defer s.mu.RUnlock() + + util := make(map[string]float64, len(s.utilization)) + for k, v := range s.utilization { + util[k] = v + } + return &v3orcapb.OrcaLoadReport{ + CpuUtilization: s.cpu, + MemUtilization: s.memory, + Utilization: util, + } +} diff --git a/orca/service_test.go b/orca/service_test.go new file mode 100644 index 000000000000..b9eff4365786 --- /dev/null +++ b/orca/service_test.go @@ -0,0 +1,193 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package orca_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/orca" + "google.golang.org/grpc/orca/internal" + "google.golang.org/protobuf/types/known/durationpb" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" + v3orcaservicegrpc "github.com/cncf/xds/go/xds/service/orca/v3" + v3orcaservicepb "github.com/cncf/xds/go/xds/service/orca/v3" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +const requestsMetricKey = "test-service-requests" + +// An implementation of grpc_testing.TestService for the purpose of this test. +// We cannot use the StubServer approach here because we need to register the +// OpenRCAService as well on the same gRPC server. +type testServiceImpl struct { + mu sync.Mutex + requests int64 + + testgrpc.TestServiceServer + orcaSrv *orca.Service +} + +func (t *testServiceImpl) UnaryCall(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + t.mu.Lock() + t.requests++ + t.mu.Unlock() + + t.orcaSrv.SetUtilization(requestsMetricKey, float64(t.requests)) + t.orcaSrv.SetCPUUtilization(50.0) + t.orcaSrv.SetMemoryUtilization(99.0) + return &testpb.SimpleResponse{}, nil +} + +func (t *testServiceImpl) EmptyCall(context.Context, *testpb.Empty) (*testpb.Empty, error) { + t.orcaSrv.DeleteUtilization(requestsMetricKey) + t.orcaSrv.SetCPUUtilization(0) + t.orcaSrv.SetMemoryUtilization(0) + return &testpb.Empty{}, nil +} + +// Test_E2E_CustomBackendMetrics_OutOfBand tests the injection of out-of-band +// custom backend metrics from the server application, and verifies that +// expected load reports are received at the client. +// +// TODO: Change this test to use the client API, when ready, to read the +// out-of-band metrics pushed by the server. +func (s) Test_E2E_CustomBackendMetrics_OutOfBand(t *testing.T) { + lis, err := testutils.LocalTCPListener() + if err != nil { + t.Fatal(err) + } + + // Override the min reporting interval in the internal package. + const shortReportingInterval = 100 * time.Millisecond + opts := orca.ServiceOptions{MinReportingInterval: shortReportingInterval} + internal.AllowAnyMinReportingInterval.(func(*orca.ServiceOptions))(&opts) + + // Register the OpenRCAService with a very short metrics reporting interval. + s := grpc.NewServer() + orcaSrv, err := orca.Register(s, opts) + if err != nil { + t.Fatalf("orca.EnableOutOfBandMetricsReportingForTesting() failed: %v", err) + } + + // Register the test service implementation on the same grpc server, and start serving. + testpb.RegisterTestServiceServer(s, &testServiceImpl{orcaSrv: orcaSrv}) + go s.Serve(lis) + defer s.Stop() + t.Logf("Started gRPC server at %s...", lis.Addr().String()) + + // Dial the test server. + cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial(%s) failed: %v", lis.Addr().String(), err) + } + defer cc.Close() + + // Spawn a goroutine which sends 100 unary RPCs to the test server. This + // will trigger the injection of custom backend metrics from the + // testServiceImpl. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testStub := testgrpc.NewTestServiceClient(cc) + errCh := make(chan error, 1) + go func() { + for i := 0; i < 100; i++ { + if _, err := testStub.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { + errCh <- fmt.Errorf("UnaryCall failed: %v", err) + return + } + time.Sleep(10 * time.Millisecond) + } + errCh <- nil + }() + + // Start the server streaming RPC to receive custom backend metrics. + oobStub := v3orcaservicegrpc.NewOpenRcaServiceClient(cc) + stream, err := oobStub.StreamCoreMetrics(ctx, &v3orcaservicepb.OrcaLoadReportRequest{ReportInterval: durationpb.New(shortReportingInterval)}) + if err != nil { + t.Fatalf("Failed to create a stream for out-of-band metrics") + } + + // Wait for the server to push metrics which indicate the completion of all + // the unary RPCs made from the above goroutine. + for { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values") + case err := <-errCh: + if err != nil { + t.Fatal(err) + } + default: + } + + wantProto := &v3orcapb.OrcaLoadReport{ + CpuUtilization: 50.0, + MemUtilization: 99.0, + Utilization: map[string]float64{requestsMetricKey: 100.0}, + } + gotProto, err := stream.Recv() + if err != nil { + t.Fatalf("Recv() failed: %v", err) + } + if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) { + t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto)) + continue + } + // This means that we received the metrics which we expected. + break + } + + // The EmptyCall RPC is expected to delete earlier injected metrics. + if _, err := testStub.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall failed: %v", err) + } + // Wait for the server to push empty metrics which indicate the processing + // of the above EmptyCall RPC. + for { + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for out-of-band custom backend metrics to match expected values") + default: + } + + wantProto := &v3orcapb.OrcaLoadReport{} + gotProto, err := stream.Recv() + if err != nil { + t.Fatalf("Recv() failed: %v", err) + } + if !cmp.Equal(gotProto, wantProto, cmp.Comparer(proto.Equal)) { + t.Logf("Received load report from stream: %s, want: %s", pretty.ToJSON(gotProto), pretty.ToJSON(wantProto)) + continue + } + // This means that we received the metrics which we expected. + break + } +} diff --git a/server.go b/server.go index 2ad9da7bfccf..6ef3df67d9e5 100644 --- a/server.go +++ b/server.go @@ -79,6 +79,7 @@ func init() { internal.ClearExtraServerOptions = func() { extraServerOptions = nil } + internal.JoinServerOptions = newJoinServerOption } var statusOK = status.New(codes.OK, "") @@ -214,6 +215,22 @@ func newFuncServerOption(f func(*serverOptions)) *funcServerOption { } } +// joinServerOption provides a way to combine arbitrary number of server +// options into one. +type joinServerOption struct { + opts []ServerOption +} + +func (mdo *joinServerOption) apply(do *serverOptions) { + for _, opt := range mdo.opts { + opt.apply(do) + } +} + +func newJoinServerOption(opts ...ServerOption) ServerOption { + return &joinServerOption{opts: opts} +} + // WriteBufferSize determines how much data can be batched before doing a write on the wire. // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. // The default value for this buffer is 32KB.