Skip to content

Commit

Permalink
examples: add example for ORCA load reporting (#6114)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Mar 14, 2023
1 parent b458a4f commit 16c3b7d
Show file tree
Hide file tree
Showing 7 changed files with 305 additions and 3 deletions.
4 changes: 4 additions & 0 deletions examples/examples_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ EXAMPLES=(
"features/metadata_interceptor"
"features/multiplex"
"features/name_resolving"
"features/orca"
"features/retry"
"features/unix_abstract"
)
Expand All @@ -75,6 +76,7 @@ declare -A SERVER_ARGS=(

declare -A CLIENT_ARGS=(
["features/unix_abstract"]="-addr $UNIX_ADDR"
["features/orca"]="-test=true"
["default"]="-addr localhost:$SERVER_PORT"
)

Expand Down Expand Up @@ -114,6 +116,7 @@ declare -A EXPECTED_SERVER_OUTPUT=(
["features/metadata_interceptor"]="key1 from metadata: "
["features/multiplex"]=":50051"
["features/name_resolving"]="serving on localhost:50051"
["features/orca"]="Server listening"
["features/retry"]="request succeeded count: 4"
["features/unix_abstract"]="serving on @abstract-unix-socket"
)
Expand All @@ -134,6 +137,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=(
["features/metadata_interceptor"]="BidiStreaming Echo: hello world"
["features/multiplex"]="Greeting: Hello multiplex"
["features/name_resolving"]="calling helloworld.Greeter/SayHello to \"example:///resolver.example.grpc.io\""
["features/orca"]="Per-call load report received: map\[db_queries:10\]"
["features/retry"]="UnaryEcho reply: message:\"Try and Success\""
["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket"
)
Expand Down
48 changes: 48 additions & 0 deletions examples/features/orca/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# ORCA Load Reporting

ORCA is a protocol for reporting load between servers and clients. This
example shows how to implement this from both the client and server side. For
more details, please see [gRFC
A51](https://github.com/grpc/proposal/blob/master/A51-custom-backend-metrics.md)

## Try it

```
go run server/main.go
```

```
go run client/main.go
```

## Explanation

gRPC ORCA support provides two different ways to report load data to clients
from servers: out-of-band and per-RPC. Out-of-band metrics are reported
regularly at some interval on a stream, while per-RPC metrics are reported
along with the trailers at the end of a call. Both of these mechanisms are
optional and work independently.

The full ORCA API documentation is available here:
https://pkg.go.dev/google.golang.org/grpc/orca

### Out-of-band Metrics

The server registers an ORCA service that is used for out-of-band metrics. It
does this by using `orca.Register()` and then setting metrics on the returned
`orca.Service` using its methods.

The client receives out-of-band metrics via the LB policy. It receives
callbacks to a listener by registering the listener on a `SubConn` via
`orca.RegisterOOBListener`.

### Per-RPC Metrics

The server is set up to report query cost metrics in its RPC handler. For
per-RPC metrics to be reported, the gRPC server must be created with the
`orca.CallMetricsServerOption()` option, and metrics are set by calling methods
on the returned `orca.CallMetricRecorder` from
`orca.CallMetricRecorderFromContext()`.

The client performs one RPC per second. Per-RPC metrics are available for each
call via the `Done()` callback returned from the LB policy's picker.
153 changes: 153 additions & 0 deletions examples/features/orca/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary client is an example client.
package main

import (
"context"
"flag"
"fmt"
"log"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/orca"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
pb "google.golang.org/grpc/examples/features/proto/echo"
)

var addr = flag.String("addr", "localhost:50051", "the address to connect to")
var test = flag.Bool("test", false, "if set, only 1 RPC is performed before exiting")

func main() {
flag.Parse()

// Set up a connection to the server. Configure to use our custom LB
// policy which will receive all the ORCA load reports.
conn, err := grpc.Dial(*addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"orca_example":{}}]}`),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

c := pb.NewEchoClient(conn)

// Perform RPCs once per second.
ticker := time.NewTicker(time.Second)
for range ticker.C {
func() {
// Use an anonymous function to ensure context cancelation via defer.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
if _, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "test echo message"}); err != nil {
log.Fatalf("Error from UnaryEcho call: %v", err)
}
}()
if *test {
return
}
}

}

// Register an ORCA load balancing policy to receive per-call metrics and
// out-of-band metrics.
func init() {
balancer.Register(orcaLBBuilder{})
}

type orcaLBBuilder struct{}

func (orcaLBBuilder) Name() string { return "orca_example" }
func (orcaLBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return &orcaLB{cc: cc}
}

// orcaLB is an incomplete LB policy designed to show basic ORCA load reporting
// functionality. It collects per-call metrics in the `Done` callback returned
// by its picker, and it collects out-of-band metrics by registering a listener
// when its SubConn is created. It does not follow general LB policy best
// practices and makes assumptions about the simple test environment it is
// designed to run within.
type orcaLB struct {
cc balancer.ClientConn
}

func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error {
// We assume only one update, ever, containing exactly one address, given
// the use of the "passthrough" (default) name resolver.

addrs := ccs.ResolverState.Addresses
if len(addrs) != 1 {
return fmt.Errorf("orcaLB: expected 1 address; received: %v", addrs)
}

// Create one SubConn for the address and connect it.
sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{})
if err != nil {
return fmt.Errorf("orcaLB: error creating SubConn: %v", err)
}
sc.Connect()

// Register a simple ORCA OOB listener on the SubConn. We request a 1
// second report interval, but in this example the server indicated the
// minimum interval it will allow is 3 seconds, so reports will only be
// sent that often.
orca.RegisterOOBListener(sc, orcaLis{}, orca.OOBListenerOptions{ReportInterval: time.Second})

return nil
}

func (o *orcaLB) ResolverError(error) {}

func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
if scs.ConnectivityState == connectivity.Ready {
o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}})
}
}

func (o *orcaLB) Close() {}

type picker struct {
sc balancer.SubConn
}

func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
return balancer.PickResult{
SubConn: p.sc,
Done: func(di balancer.DoneInfo) {
fmt.Println("Per-call load report received:", di.ServerLoad.(*v3orcapb.OrcaLoadReport).GetRequestCost())
},
}, nil
}

// orcaLis is the out-of-band load report listener that we pass to
// orca.RegisterOOBListener to receive periodic load report information.
type orcaLis struct{}

func (orcaLis) OnLoadReport(lr *v3orcapb.OrcaLoadReport) {
fmt.Println("Out-of-band load report received:", lr)
}
92 changes: 92 additions & 0 deletions examples/features/orca/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Binary server is an example server.
package main

import (
"context"
"flag"
"fmt"
"log"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/status"

pb "google.golang.org/grpc/examples/features/proto/echo"
)

var port = flag.Int("port", 50051, "the port to serve on")

type server struct {
pb.UnimplementedEchoServer
}

func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
// Report a sample cost for this query.
cmr := orca.CallMetricRecorderFromContext(ctx)
if cmr == nil {
return nil, status.Errorf(codes.Internal, "unable to retrieve call metric recorder (missing ORCA ServerOption?)")
}
cmr.SetRequestCost("db_queries", 10)

return &pb.EchoResponse{Message: in.Message}, nil
}

func main() {
flag.Parse()

lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
fmt.Printf("Server listening at %v\n", lis.Addr())

// Create the gRPC server with the orca.CallMetricsServerOption() option,
// which will enable per-call metric recording.
s := grpc.NewServer(orca.CallMetricsServerOption())
pb.RegisterEchoServer(s, &server{})

// Register the orca service for out-of-band metric reporting, and set the
// minimum reporting interval to 3 seconds. Note that, by default, the
// minimum interval must be at least 30 seconds, but 3 seconds is set via
// an internal-only option for illustration purposes only.
opts := orca.ServiceOptions{MinReportingInterval: 3 * time.Second}
internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&opts)
orcaSvc, err := orca.Register(s, opts)
if err != nil {
log.Fatalf("Failed to register ORCA service: %v", err)
}

// Simulate CPU utilization reporting.
go func() {
for {
orcaSvc.SetCPUUtilization(.5)
time.Sleep(2 * time.Second)
orcaSvc.SetCPUUtilization(.9)
time.Sleep(2 * time.Second)
}
}()

s.Serve(lis)
}
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module google.golang.org/grpc/examples
go 1.17

require (
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b
github.com/golang/protobuf v1.5.2
golang.org/x/oauth2 v0.4.0
google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f
Expand All @@ -16,7 +17,6 @@ require (
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect
github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b // indirect
github.com/envoyproxy/go-control-plane v0.10.3 // indirect
github.com/envoyproxy/protoc-gen-validate v0.9.1 // indirect
golang.org/x/net v0.8.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ var (
//
// TODO: Remove this function once the RBAC env var is removed.
UnregisterRBACHTTPFilterForTesting func()

// ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY.
ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions)
)

// HealthChecker defines the signature of the client-side LB channel health checking function.
Expand Down
6 changes: 4 additions & 2 deletions orca/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import (

"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/internal"
ointernal "google.golang.org/grpc/orca/internal"
"google.golang.org/grpc/status"

v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3"
Expand All @@ -33,9 +34,10 @@ import (
)

func init() {
internal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) {
so.allowAnyMinReportingInterval = true
}
internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval
}

// minReportingInterval is the absolute minimum value supported for
Expand Down

0 comments on commit 16c3b7d

Please sign in to comment.