Skip to content

Commit

Permalink
introduces configurable dispatch hashring
Browse files Browse the repository at this point in the history
this new CLI allows parameterizing the replication factor
used in hashring used by the dispatch client-side gRPC balancer.

This was a bit more involved as the balancer registry is a global
singleton meant to be registered on an init block. This would have
required moving flag handling outside of cobra, which is not ideal.

As a solution I changed how the balancer is named, appending the replication
factor to the name, and introducing a lazy-load mechanism with a mutex:
- if a balancer with a specific replication factor does not exist, lock is
  acquired and the balancer is registered. This avoids races if multiple
  spicedb servers are running in the same process
- if a balancer exists already, it's not registered
- we do not unregister the balancer when the server is wind down, as
  it could affect another spicedb instance in the process using the
  same replication factor
  • Loading branch information
vroldanbet committed Mar 29, 2023
1 parent b5886f0 commit 34a4202
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 30 deletions.
15 changes: 0 additions & 15 deletions cmd/spicedb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,23 @@ import (
"errors"
"os"

"github.com/cespare/xxhash/v2"
"github.com/rs/zerolog"
"github.com/sercand/kuberesolver/v3"
"github.com/spf13/cobra"
"google.golang.org/grpc/balancer"
_ "google.golang.org/grpc/xds"

log "github.com/authzed/spicedb/internal/logging"
consistentbalancer "github.com/authzed/spicedb/pkg/balancer"
"github.com/authzed/spicedb/pkg/cmd"
cmdutil "github.com/authzed/spicedb/pkg/cmd/server"
"github.com/authzed/spicedb/pkg/cmd/testserver"
)

const (
hashringReplicationFactor = 20
backendsPerKey = 1
)

var errParsing = errors.New("parsing error")

func main() {
// Enable Kubernetes gRPC resolver
kuberesolver.RegisterInCluster()

// Enable consistent hashring gRPC load balancer
balancer.Register(consistentbalancer.NewConsistentHashringBuilder(
xxhash.Sum64,
hashringReplicationFactor,
backendsPerKey,
))

log.SetGlobalLogger(zerolog.New(os.Stdout))

// Create a root command
Expand Down
2 changes: 1 addition & 1 deletion internal/testserver/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestClusterWithDispatchAndCacheConfig(t testing.TB, size uint, ds datastore
combineddispatch.UpstreamAddr("test://" + prefix),
combineddispatch.PrometheusSubsystem(fmt.Sprintf("%s_%d_client_dispatch", prefix, i)),
combineddispatch.GrpcDialOpts(
grpc.WithDefaultServiceConfig(hashbalancer.BalancerServiceConfig),
grpc.WithDefaultServiceConfig(hashbalancer.ServiceConfigForBalancerName(hashbalancer.NameForReplicationFactor(100))),
grpc.WithContextDialer(func(ctx context.Context, s string) (net.Conn, error) {
// it's possible grpc tries to dial before we have set the
// buffconn dialers, we have to return a "TempError" so that
Expand Down
22 changes: 17 additions & 5 deletions pkg/balancer/hashring.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package balancer

import (
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -15,12 +16,12 @@ import (
type ctxKey string

const (
// BalancerName is the name of consistent-hashring balancer.
BalancerName = "consistent-hashring"
// name is the name of consistent-hashring balancer.
name = "consistent-hashring"

// BalancerServiceConfig is a service config that sets the default balancer
// serviceConfig is a service config that sets the default balancer
// to the consistent-hashring balancer
BalancerServiceConfig = `{"loadBalancingPolicy":"consistent-hashring"}`
serviceConfig = `{"loadBalancingPolicy":"%s"}`

// CtxKey is the key for the grpc request's context.Context which points to
// the key to hash for the request. The value it points to must be []byte
Expand All @@ -35,12 +36,23 @@ var logger = grpclog.Component("consistenthashring")
// `balancer.Register(consistent.NewConsistentHashringBuilder(hasher, factor, spread))`
func NewConsistentHashringBuilder(hasher consistent.HasherFunc, replicationFactor uint16, spread uint8) balancer.Builder {
return base.NewBalancerBuilder(
BalancerName,
NameForReplicationFactor(replicationFactor),
&consistentHashringPickerBuilder{hasher: hasher, replicationFactor: replicationFactor, spread: spread},
base.Config{HealthCheck: true},
)
}

// NameForReplicationFactor returns the name of the balancer for a given replication factor
func NameForReplicationFactor(replicationFactor uint16) string {
return fmt.Sprintf(name+"-rf-%d", replicationFactor)
}

// ServiceConfigForBalancerName provides the gRPC service configuration string for
// a hashring balancer with a specific replication factor by its name
func ServiceConfigForBalancerName(balancerName string) string {
return fmt.Sprintf(serviceConfig, balancerName)
}

type subConnMember struct {
balancer.SubConn
key string
Expand Down
3 changes: 1 addition & 2 deletions pkg/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,12 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error {
cmd.Flags().StringVar(&config.DispatchUpstreamAddr, "dispatch-upstream-addr", "", "upstream grpc address to dispatch to")
cmd.Flags().StringVar(&config.DispatchUpstreamCAPath, "dispatch-upstream-ca-path", "", "local path to the TLS CA used when connecting to the dispatch cluster")
cmd.Flags().DurationVar(&config.DispatchUpstreamTimeout, "dispatch-upstream-timeout", 60*time.Second, "maximum duration of a dispatch call an upstream cluster before it times out")

cmd.Flags().Uint16Var(&config.GlobalDispatchConcurrencyLimit, "dispatch-concurrency-limit", 50, "maximum number of parallel goroutines to create for each request or subrequest")

cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.Check, "dispatch-check-permission-concurrency-limit", 0, "maximum number of parallel goroutines to create for each check request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.LookupResources, "dispatch-lookup-resources-concurrency-limit", 0, "maximum number of parallel goroutines to create for each lookup resources request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.LookupSubjects, "dispatch-lookup-subjects-concurrency-limit", 0, "maximum number of parallel goroutines to create for each lookup subjects request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.ReachableResources, "dispatch-reachable-resources-concurrency-limit", 0, "maximum number of parallel goroutines to create for each reachable resources request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchHashringReplicationFactor, "dispatch-hashring-replication-factor", 100, "set the replication factor of the consistent hasher used for the dispatcher")

// Flags for configuring API behavior
cmd.Flags().BoolVar(&config.DisableV1SchemaAPI, "disable-v1-schema-api", false, "disables the V1 schema API")
Expand Down
36 changes: 29 additions & 7 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/authzed/grpcutil"
"github.com/cespare/xxhash/v2"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/hashicorp/go-multierror"
Expand All @@ -19,6 +20,7 @@ import (
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
grpcbalancer "google.golang.org/grpc/balancer"

"github.com/authzed/spicedb/internal/auth"
"github.com/authzed/spicedb/internal/dashboard"
Expand All @@ -43,11 +45,12 @@ import (
//go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . Config
type Config struct {
// API config
GRPCServer util.GRPCServerConfig
GRPCAuthFunc grpc_auth.AuthFunc
PresharedKey []string
ShutdownGracePeriod time.Duration
DisableVersionResponse bool
GRPCServer util.GRPCServerConfig
GRPCAuthFunc grpc_auth.AuthFunc
DispatchHashringReplicationFactor uint16
PresharedKey []string
ShutdownGracePeriod time.Duration
DisableVersionResponse bool

// GRPC Gateway config
HTTPGateway util.HTTPServerConfig
Expand Down Expand Up @@ -108,6 +111,10 @@ type Config struct {
TelemetryInterval time.Duration
}

const defaultBackendsPerKey = 1

var balancerRegistryMutex = sync.Mutex{}

type closeableStack struct {
closers []func() error
}
Expand Down Expand Up @@ -160,6 +167,19 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
}
}()

balancerRegistryMutex.Lock()
dispatchBalancerName := balancer.NameForReplicationFactor(c.DispatchHashringReplicationFactor)
if grpcbalancer.Get(dispatchBalancerName) == nil {
// Enable consistent hashring gRPC load balancer with a specific replication factor
grpcbalancer.Register(balancer.NewConsistentHashringBuilder(
xxhash.Sum64,
c.DispatchHashringReplicationFactor,
defaultBackendsPerKey,
))
log.Ctx(ctx).Debug().Uint16("replication-factor", c.DispatchHashringReplicationFactor).Msg("registered new grpc hashring balancer")
}
balancerRegistryMutex.Unlock()

if len(c.PresharedKey) < 1 && c.GRPCAuthFunc == nil {
return nil, fmt.Errorf("a preshared key must be provided to authenticate API requests")
}
Expand Down Expand Up @@ -217,15 +237,17 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {

specificConcurrencyLimits := c.DispatchConcurrencyLimits
concurrencyLimits := specificConcurrencyLimits.WithOverallDefaultLimit(c.GlobalDispatchConcurrencyLimit)
log.Ctx(ctx).Info().EmbedObject(concurrencyLimits).Msg("configured dispatch concurrency limits")
log.Ctx(ctx).Info().EmbedObject(concurrencyLimits).
Uint16("hashring-replication-factor", c.DispatchHashringReplicationFactor).
Msg("configured dispatch concurrency limits")

dispatcher, err = combineddispatch.NewDispatcher(
combineddispatch.UpstreamAddr(c.DispatchUpstreamAddr),
combineddispatch.UpstreamCAPath(c.DispatchUpstreamCAPath),
combineddispatch.GrpcPresharedKey(dispatchPresharedKey),
combineddispatch.GrpcDialOpts(
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor()),
grpc.WithDefaultServiceConfig(balancer.BalancerServiceConfig),
grpc.WithDefaultServiceConfig(balancer.ServiceConfigForBalancerName(dispatchBalancerName)),
),
combineddispatch.MetricsEnabled(c.DispatchClientMetricsEnabled),
combineddispatch.PrometheusSubsystem(c.DispatchClientMetricsPrefix),
Expand Down
24 changes: 24 additions & 0 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (

"github.com/authzed/spicedb/internal/datastore/memdb"
"github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/balancer"
"github.com/authzed/spicedb/pkg/cmd/util"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"
grpcbalancer "google.golang.org/grpc/balancer"
)

func TestServerGracefulTermination(t *testing.T) {
Expand Down Expand Up @@ -48,6 +50,28 @@ func TestServerGracefulTermination(t *testing.T) {
<-ch
}

func TestBalancerRegistration(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ds, err := memdb.NewMemdbDatastore(0, 1*time.Second, 10*time.Second)
require.NoError(t, err)

c := ConfigWithOptions(
&Config{},
WithPresharedKey("psk"),
WithDatastore(ds),
WithDispatchHashringReplicationFactor(1000),
)
srv, err := c.Complete(ctx)
require.NoError(t, err)
require.NoError(t, srv.(*completedServerConfig).closeFunc())

require.NotNil(t, grpcbalancer.Get(balancer.NameForReplicationFactor(1000)))
require.Nil(t, grpcbalancer.Get(balancer.NameForReplicationFactor(100)))
}

func TestServerGracefulTerminationOnError(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())

Expand Down
8 changes: 8 additions & 0 deletions pkg/cmd/server/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 34a4202

Please sign in to comment.