Skip to content

Commit

Permalink
xds: add support for HTTP filters (gRFC A39) (#4206)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Feb 25, 2021
1 parent c8cef76 commit 60843b1
Show file tree
Hide file tree
Showing 18 changed files with 1,342 additions and 165 deletions.
32 changes: 32 additions & 0 deletions internal/resolver/config_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ type RPCConfig struct {
Context context.Context
MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
Interceptor ClientInterceptor
}

// ClientStream will ultimately be a superset of grpc.ClientStream as
// operations become necessary to support.
type ClientStream interface {
// Done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations.
Done()
}

// NOPClientStream is a ClientStream that does nothing
type NOPClientStream struct{}

// Done is a nop.
func (NOPClientStream) Done() {}

var _ ClientStream = NOPClientStream{}

// ClientInterceptor is an interceptor for gRPC client streams.
type ClientInterceptor interface {
// NewStream can intercept ClientStream calls. The provided ClientStream
// should not be used during NewStream. RPCInfo.Context should not be used
// (will be nil).
NewStream(context.Context, RPCInfo, ClientStream) (context.Context, ClientStream, error)
}

// ServerInterceptor is unimplementable; do not use.
type ServerInterceptor interface {
notDefined()
}

type csKeyType string
Expand Down
4 changes: 4 additions & 0 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
buf: newRecvBuffer(),
headerChan: make(chan struct{}),
contentSubtype: callHdr.ContentSubtype,
doneFunc: callHdr.DoneFunc,
}
s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) {
Expand Down Expand Up @@ -832,6 +833,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
// This will unblock write.
close(s.done)
if s.doneFunc != nil {
s.doneFunc()
}
}

// Close kicks off the shutdown process of the transport. This should be called
Expand Down
3 changes: 3 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ type Stream struct {
ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
doneFunc func() // invoked at the end of stream on client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream
recvCompress string
Expand Down Expand Up @@ -611,6 +612,8 @@ type CallHdr struct {
ContentSubtype string

PreviousAttempts int // value of grpc-previous-rpc-attempts header to set

DoneFunc func() // called when the stream is finished
}

// ClientTransport is the common interface for all gRPC client-side transport
Expand Down
14 changes: 13 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,27 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth

var mc serviceconfig.MethodConfig
var onCommit func()
rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method})
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
return nil, status.Convert(err).Err()
}
var doneFunc func()
if rpcConfig != nil {
if rpcConfig.Context != nil {
ctx = rpcConfig.Context
}
mc = rpcConfig.MethodConfig
onCommit = rpcConfig.OnCommitted
if rpcConfig.Interceptor != nil {
rpcInfo.Context = nil
newCtx, cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, iresolver.NOPClientStream{})
if err != nil {
return nil, status.Convert(err).Err()
}
ctx = newCtx
doneFunc = cs.Done
}
}

if mc.WaitForReady != nil {
Expand Down Expand Up @@ -223,6 +234,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
Host: cc.authority,
Method: method,
ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
}

// Set our outgoing compression according to the UseCompressor CallOption, if
Expand Down
42 changes: 39 additions & 3 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*
*/

// Package client implementation a full fledged gRPC client for the xDS API
// used by the xds resolver and balancer implementations.
// Package client implements a full fledged gRPC client for the xDS API used by
// the xds resolver and balancer implementations.
package client

import (
Expand All @@ -33,6 +33,7 @@ import (
"google.golang.org/protobuf/types/known/anypb"

"google.golang.org/grpc/xds/internal/client/load"
"google.golang.org/grpc/xds/internal/httpfilter"

"google.golang.org/grpc"
"google.golang.org/grpc/internal/backoff"
Expand Down Expand Up @@ -199,11 +200,27 @@ type ListenerUpdate struct {
// common_http_protocol_options.max_stream_duration field, or zero if
// unset.
MaxStreamDuration time.Duration
// HTTPFilters is a list of HTTP filters (name, config) from the LDS
// response.
HTTPFilters []HTTPFilter

// Raw is the resource from the xds response.
Raw *anypb.Any
}

// HTTPFilter represents one HTTP filter from an LDS response's HTTP connection
// manager field.
type HTTPFilter struct {
// Name is an arbitrary name of the filter. Used for applying override
// settings in virtual host / route / weighted cluster configuration (not
// yet supported).
Name string
// Filter is the HTTP filter found in the registry for the config type.
Filter httpfilter.Filter
// Config contains the filter's configuration
Config httpfilter.FilterConfig
}

func (lu *ListenerUpdate) String() string {
return fmt.Sprintf("{RouteConfigName: %q, SecurityConfig: %+v", lu.RouteConfigName, lu.SecurityCfg)
}
Expand All @@ -226,6 +243,11 @@ type VirtualHost struct {
// Routes contains a list of routes, each containing matchers and
// corresponding action.
Routes []*Route
// HTTPFilterConfigOverride contains any HTTP filter config overrides for
// the virtual host which may be present. An individual filter's override
// may be unused if the matching Route contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// Route is both a specification of how to match a request as well as an
Expand All @@ -239,13 +261,27 @@ type Route struct {
Fraction *uint32

// If the matchers above indicate a match, the below configuration is used.
Action map[string]uint32 // action is weighted clusters.
WeightedClusters map[string]WeightedCluster
// If MaxStreamDuration is nil, it indicates neither of the route action's
// max_stream_duration fields (grpc_timeout_header_max nor
// max_stream_duration) were set. In this case, the ListenerUpdate's
// MaxStreamDuration field should be used. If MaxStreamDuration is set to
// an explicit zero duration, the application's deadline should be used.
MaxStreamDuration *time.Duration
// HTTPFilterConfigOverride contains any HTTP filter config overrides for
// the route which may be present. An individual filter's override may be
// unused if the matching WeightedCluster contains an override for that
// filter.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// WeightedCluster contains settings for an xds RouteAction.WeightedCluster.
type WeightedCluster struct {
// Weight is the relative weight of the cluster. It will never be zero.
Weight uint32
// HTTPFilterConfigOverride contains any HTTP filter config overrides for
// the weighted cluster which may be present.
HTTPFilterConfigOverride map[string]httpfilter.FilterConfig
}

// HeaderMatcher represents header matchers.
Expand Down
Loading

0 comments on commit 60843b1

Please sign in to comment.