Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

orca: create ORCA producer for LB policies to use to receive OOB load reports #5669

Merged
merged 9 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ type SubConn interface {
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
// GetOrBuildProducer returns a reference to the existing Producer for this
// ProducerBuilder in this SubConn, or, if one does not currently exist,
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
dfawley marked this conversation as resolved.
Show resolved Hide resolved
}

// NewSubConnOptions contains options to create new SubConn.
Expand Down Expand Up @@ -371,3 +376,21 @@ type ClientConnState struct {
// ErrBadResolverState may be returned by UpdateClientConnState to indicate a
// problem with the provided name resolver data.
var ErrBadResolverState = errors.New("bad resolver state")

// A ProducerBuilder is a simple constructor for a Producer. It is used by the
// SubConn to create producers when needed.
type ProducerBuilder interface {
// Build creates a Producer. The first parameter is always a
// grpc.ClientConnInterface (a type to allow creating RPCs/streams on the
// associated SubConn), but is declared as interface{} to avoid a
// dependency cycle. Should also return a close function that will be
// called when all references to the Producer have been given up.
Build(grpcClientConnInterface interface{}) (p Producer, close func())
}

// A Producer is a type shared among potentially many consumers. It is
// associated with a SubConn, and an implementation will typically contain
// other methods to provide additional functionality, e.g. configuration or
// subscription registration.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description feels very vague to me, in the sense that I can't get a good idea of when to use a Producer, how to use one etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you suggest something better after our discussion today?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. The first sentence is fine, but I don't get much technical understanding from: "and an implementation will typically contain other methods to provide additional functionality, e.g. configuration or subscription registration." Additional functionality with respect to what? There's no functionality defined before this. Do we expect this to ever be implemented by another concrete type? This comment is very specific to type producer in orca/producer. If so, why have this interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional functionality with respect to what? There's no functionality defined before this.

In addition to the builder that created it, I suppose, and the close function the builder returns.

Do we expect this to ever be implemented by another concrete type?

Yes, definitely. This is intended to be generic and able to be implemented by anything. The other example we already have is the health checks, which are implemented in the other languages as a generic wrapper, but hard-coded into gRPC in Go. I'd like to move to a similar model there by using this mechanism.

This comment is very specific to type producer in orca/producer.

Not at all. If it was, it would talk about ORCA configuration. ORCA should not be part of grpc-go directly, but instead should only be a plugin which uses this.

type Producer interface {
}
4 changes: 4 additions & 0 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

func (sc *testSubConn) Connect() {}

func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}

// testPickBuilder creates balancer.Picker for test.
type testPickBuilder struct {
validate func(info PickerBuildInfo)
Expand Down
71 changes: 68 additions & 3 deletions balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@
package grpc

import (
"context"
"fmt"
"strings"
"sync"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/buffer"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)

// ccBalancerWrapper sits between the ClientConn and the Balancer.
Expand Down Expand Up @@ -305,7 +308,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err)
return nil, err
}
acbw := &acBalancerWrapper{ac: ac}
acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
acbw.ac.mu.Lock()
ac.acbw = acbw
acbw.ac.mu.Unlock()
Expand Down Expand Up @@ -359,8 +362,9 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
mu sync.Mutex
ac *addrConn
mu sync.Mutex
ac *addrConn
producers map[balancer.ProducerBuilder]*refCountedProducer
}

func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
Expand Down Expand Up @@ -414,3 +418,64 @@ func (acbw *acBalancerWrapper) getAddrConn() *addrConn {
defer acbw.mu.Unlock()
return acbw.ac
}

var errSubConnNotReady = status.Error(codes.Unavailable, "SubConn not currently connected")
zasweq marked this conversation as resolved.
Show resolved Hide resolved

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, returns errSubConnNotReady.
func (acbw *acBalancerWrapper) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
transport := acbw.ac.getReadyTransport()
if transport == nil {
return nil, errSubConnNotReady
}
return newNonRetryClientStream(ctx, desc, method, transport, acbw.ac, opts...)
}

// Invoke performs a unary RPC. If the addrConn is not ready, returns
// errSubConnNotReady.
func (acbw *acBalancerWrapper) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...CallOption) error {
cs, err := acbw.NewStream(ctx, unaryStreamDesc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(args); err != nil {
return err
}
return cs.RecvMsg(reply)
}

type refCountedProducer struct {
producer balancer.Producer
refs int // number of current refs to the producer
close func() // underlying producer's close function
}

func (acbw *acBalancerWrapper) GetOrBuildProducer(pb balancer.ProducerBuilder) (balancer.Producer, func()) {
acbw.mu.Lock()
defer acbw.mu.Unlock()

// Look up existing producer from this builder.
pData := acbw.producers[pb]
if pData == nil {
// Not found; create a new one and add it to the producers map.
p, close := pb.Build(acbw)
pData = &refCountedProducer{producer: p, close: close}
acbw.producers[pb] = pData
}
// Account for this new reference.
pData.refs++

// Return a cleanup function wrapped in a OnceFunc to remove this reference
// and delete the refCountedProducer from the map if the total reference
// count goes to zero.
unref := func() {
acbw.mu.Lock()
pData.refs--
if pData.refs == 0 {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
defer pData.close() // Run outside the acbw mutex
delete(acbw.producers, pb)
}
acbw.mu.Unlock()
}
return pData.producer, grpcsync.OnceFunc(unref)
}
32 changes: 32 additions & 0 deletions internal/grpcsync/oncefunc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpcsync

import (
"sync"
)

// OnceFunc returns a function wrapping f which ensures f is only executed
// once even if the returned function is executed multiple times.
func OnceFunc(f func()) func() {
var once sync.Once
return func() {
once.Do(f)
}
}
5 changes: 5 additions & 0 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (tsc *TestSubConn) Connect() {
}
}

// GetOrBuildProducer is a no-op.
zasweq marked this conversation as resolved.
Show resolved Hide resolved
func (tsc *TestSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}

// String implements stringer to print human friendly error message.
func (tsc *TestSubConn) String() string {
return tsc.id
Expand Down
7 changes: 7 additions & 0 deletions orca/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,15 @@
// avoid polluting the godoc of the top-level orca package.
package internal

import ibackoff "google.golang.org/grpc/internal/backoff"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this import to be renamed?

Copy link
Member Author

@dfawley dfawley Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't; I just like to keep things consistent, and we have a backoff package and an internal/backoff package, and it's confusing if we reference both using the same name in different files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Package names should ideally be self contained and self explanatory. If we are always going to rename this import, we should ideally rename the package itself.


// AllowAnyMinReportingInterval prevents clamping of the MinReportingInterval
// configured via ServiceOptions, to a minimum of 30s.
//
// For testing purposes only.
var AllowAnyMinReportingInterval interface{} // func(*ServiceOptions)

// DefaultBackoffFunc is used by the producer to control its backoff behavior.
//
// For testing purposes only.
var DefaultBackoffFunc = ibackoff.DefaultExponential.Backoff
Loading