diff --git a/cmd/spicedb/main.go b/cmd/spicedb/main.go index 60c721837a..94887af20a 100644 --- a/cmd/spicedb/main.go +++ b/cmd/spicedb/main.go @@ -4,7 +4,6 @@ import ( "errors" "os" - "github.com/cespare/xxhash/v2" "github.com/rs/zerolog" "github.com/sercand/kuberesolver/v3" "github.com/spf13/cobra" @@ -18,11 +17,6 @@ import ( "github.com/authzed/spicedb/pkg/cmd/testserver" ) -const ( - hashringReplicationFactor = 20 - backendsPerKey = 1 -) - var errParsing = errors.New("parsing error") func main() { @@ -30,11 +24,7 @@ func main() { kuberesolver.RegisterInCluster() // Enable consistent hashring gRPC load balancer - balancer.Register(consistentbalancer.NewConsistentHashringBuilder( - xxhash.Sum64, - hashringReplicationFactor, - backendsPerKey, - )) + balancer.Register(consistentbalancer.NewConsistentHashringBuilder(cmdutil.ConsistentHashringPicker)) log.SetGlobalLogger(zerolog.New(os.Stdout)) diff --git a/internal/services/integrationtesting/benchmark_test.go b/internal/services/integrationtesting/benchmark_test.go index 0bd9dd6b83..c0fda7b59e 100644 --- a/internal/services/integrationtesting/benchmark_test.go +++ b/internal/services/integrationtesting/benchmark_test.go @@ -190,7 +190,7 @@ func BenchmarkServices(b *testing.B) { }) brequire.NoError(err) - conn, cleanup := testserver.TestClusterWithDispatchAndCacheConfig(b, 1, ds, false /* no cache */) + conn, cleanup := testserver.TestClusterWithDispatchAndCacheConfig(b, 1, ds) b.Cleanup(cleanup) dsCtx := datastoremw.ContextWithHandle(context.Background()) diff --git a/internal/testserver/cluster.go b/internal/testserver/cluster.go index 1791fe3661..ea289a34da 100644 --- a/internal/testserver/cluster.go +++ b/internal/testserver/cluster.go @@ -59,7 +59,9 @@ var testResolverBuilder = &SafeManualResolverBuilder{} func init() { // register hashring balancer - balancer.Register(hashbalancer.NewConsistentHashringBuilder(xxhash.Sum64, 20, 1)) + balancer.Register(hashbalancer.NewConsistentHashringBuilder( + hashbalancer.NewConsistentHashringPickerBuilder(xxhash.Sum64, 1500, 1)), + ) // Register a manual resolver.Builder that we can feed addresses for tests // Registration is not thread safe, so we register a single resolver.Builder @@ -124,7 +126,7 @@ type SafeManualResolver struct { // ResolveNow implements the resolver.Resolver interface // It sends the static list of addresses to the underlying resolver.ClientConn -func (r *SafeManualResolver) ResolveNow(options resolver.ResolveNowOptions) { +func (r *SafeManualResolver) ResolveNow(_ resolver.ResolveNowOptions) { if r.cc == nil { return } @@ -139,11 +141,11 @@ func (r *SafeManualResolver) Close() {} // TestClusterWithDispatch creates a cluster with `size` nodes // The cluster has a real dispatch stack that uses bufconn grpc connections func TestClusterWithDispatch(t testing.TB, size uint, ds datastore.Datastore) ([]*grpc.ClientConn, func()) { - return TestClusterWithDispatchAndCacheConfig(t, size, ds, true) + return TestClusterWithDispatchAndCacheConfig(t, size, ds) } // TestClusterWithDispatchAndCacheConfig creates a cluster with `size` nodes and with cache toggled. -func TestClusterWithDispatchAndCacheConfig(t testing.TB, size uint, ds datastore.Datastore, cacheEnabled bool) ([]*grpc.ClientConn, func()) { +func TestClusterWithDispatchAndCacheConfig(t testing.TB, size uint, ds datastore.Datastore) ([]*grpc.ClientConn, func()) { // each cluster gets a unique prefix since grpc resolution is process-global prefix := getPrefix(t) diff --git a/pkg/balancer/hashring.go b/pkg/balancer/hashring.go index 11cdbf0148..150d79d4ce 100644 --- a/pkg/balancer/hashring.go +++ b/pkg/balancer/hashring.go @@ -30,17 +30,33 @@ const ( var logger = grpclog.Component("consistenthashring") // NewConsistentHashringBuilder creates a new balancer.Builder that -// will create a consistent hashring balancer with the given config. +// will create a consistent hashring balancer with the picker builder. // Before making a connection, register it with grpc with: // `balancer.Register(consistent.NewConsistentHashringBuilder(hasher, factor, spread))` -func NewConsistentHashringBuilder(hasher consistent.HasherFunc, replicationFactor uint16, spread uint8) balancer.Builder { +func NewConsistentHashringBuilder(pickerBuilder base.PickerBuilder) balancer.Builder { return base.NewBalancerBuilder( BalancerName, - &consistentHashringPickerBuilder{hasher: hasher, replicationFactor: replicationFactor, spread: spread}, + pickerBuilder, base.Config{HealthCheck: true}, ) } +// NewConsistentHashringPickerBuilder creates a new picker builder +// that will create consistent hashrings according to the supplied +// config. If the ReplicationFactor is changed, that new parameter +// will be used when the next picker is created. +func NewConsistentHashringPickerBuilder( + hasher consistent.HasherFunc, + initialReplicationFactor uint16, + spread uint8, +) *ConsistentHashringPickerBuilder { + return &ConsistentHashringPickerBuilder{ + hasher: hasher, + replicationFactor: initialReplicationFactor, + spread: spread, + } +} + type subConnMember struct { balancer.SubConn key string @@ -54,19 +70,32 @@ func (s subConnMember) Key() string { var _ consistent.Member = &subConnMember{} -type consistentHashringPickerBuilder struct { +// ConsistentHashringPickerBuilder is an implementation of base.PickerBuilder and +// is used to build pickers based on updates to the node architecture. +type ConsistentHashringPickerBuilder struct { + sync.Mutex + hasher consistent.HasherFunc replicationFactor uint16 spread uint8 } -func (b *consistentHashringPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { +func (b *ConsistentHashringPickerBuilder) ReplicationFactor(rf uint16) { + b.Lock() + defer b.Unlock() + b.replicationFactor = rf +} + +func (b *ConsistentHashringPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { logger.Infof("consistentHashringPicker: Build called with info: %v", info) if len(info.ReadySCs) == 0 { return base.NewErrPicker(balancer.ErrNoSubConnAvailable) } + b.Lock() hashring := consistent.MustNewHashring(b.hasher, b.replicationFactor) + b.Unlock() + for sc, scInfo := range info.ReadySCs { if err := hashring.Add(subConnMember{ SubConn: sc, @@ -106,3 +135,5 @@ func (p *consistentHashringPicker) Pick(info balancer.PickInfo) (balancer.PickRe SubConn: chosen.SubConn, }, nil } + +var _ base.PickerBuilder = &ConsistentHashringPickerBuilder{} diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index 72eee86e73..f80ce5c4dc 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -106,6 +106,8 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.LookupSubjects, "dispatch-lookup-subjects-concurrency-limit", 0, "maximum number of parallel goroutines to create for each lookup subjects request or subrequest. defaults to --dispatch-concurrency-limit") cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.ReachableResources, "dispatch-reachable-resources-concurrency-limit", 0, "maximum number of parallel goroutines to create for each reachable resources request or subrequest. defaults to --dispatch-concurrency-limit") + cmd.Flags().Uint16Var(&config.DispatchHashringReplicationFactor, "dispatch-hashring-replication-factor", 100, "set the replication factor of the consistent hasher used for the dispatcher") + // Flags for configuring API behavior cmd.Flags().BoolVar(&config.DisableV1SchemaAPI, "disable-v1-schema-api", false, "disables the V1 schema API") cmd.Flags().BoolVar(&config.DisableVersionResponse, "disable-version-response", false, "disables version response support in the API") diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index ce1c8fced2..f7af497f9d 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -11,6 +11,7 @@ import ( "time" "github.com/authzed/grpcutil" + "github.com/cespare/xxhash/v2" grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth" grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus" "github.com/hashicorp/go-multierror" @@ -40,6 +41,17 @@ import ( "github.com/authzed/spicedb/pkg/datastore" ) +const ( + hashringReplicationFactor = 100 + backendsPerKey = 1 +) + +var ConsistentHashringPicker = balancer.NewConsistentHashringPickerBuilder( + xxhash.Sum64, + hashringReplicationFactor, + backendsPerKey, +) + //go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . Config type Config struct { // API config @@ -67,18 +79,19 @@ type Config struct { SchemaPrefixesRequired bool // Dispatch options - DispatchServer util.GRPCServerConfig - DispatchMaxDepth uint32 - GlobalDispatchConcurrencyLimit uint16 - DispatchConcurrencyLimits graph.ConcurrencyLimits - DispatchUpstreamAddr string - DispatchUpstreamCAPath string - DispatchUpstreamTimeout time.Duration - DispatchClientMetricsEnabled bool - DispatchClientMetricsPrefix string - DispatchClusterMetricsEnabled bool - DispatchClusterMetricsPrefix string - Dispatcher dispatch.Dispatcher + DispatchServer util.GRPCServerConfig + DispatchMaxDepth uint32 + GlobalDispatchConcurrencyLimit uint16 + DispatchConcurrencyLimits graph.ConcurrencyLimits + DispatchUpstreamAddr string + DispatchUpstreamCAPath string + DispatchUpstreamTimeout time.Duration + DispatchClientMetricsEnabled bool + DispatchClientMetricsPrefix string + DispatchClusterMetricsEnabled bool + DispatchClusterMetricsPrefix string + Dispatcher dispatch.Dispatcher + DispatchHashringReplicationFactor uint16 DispatchCacheConfig CacheConfig ClusterDispatchCacheConfig CacheConfig @@ -238,6 +251,10 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { } closeables.AddWithError(dispatcher.Close) + // Set this value to take effect the next time the replicas are updated + // Applies to ALL running servers. + ConsistentHashringPicker.ReplicationFactor(c.DispatchHashringReplicationFactor) + if len(c.DispatchUnaryMiddleware) == 0 && len(c.DispatchStreamingMiddleware) == 0 { if c.GRPCAuthFunc == nil { c.DispatchUnaryMiddleware, c.DispatchStreamingMiddleware = DefaultDispatchMiddleware(log.Logger, auth.MustRequirePresharedKey(c.PresharedKey), ds) diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index a3e4dd33e2..f4ea3dbf04 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -52,6 +52,7 @@ func (c *Config) ToOption() ConfigOption { to.DispatchClusterMetricsEnabled = c.DispatchClusterMetricsEnabled to.DispatchClusterMetricsPrefix = c.DispatchClusterMetricsPrefix to.Dispatcher = c.Dispatcher + to.DispatchHashringReplicationFactor = c.DispatchHashringReplicationFactor to.DispatchCacheConfig = c.DispatchCacheConfig to.ClusterDispatchCacheConfig = c.ClusterDispatchCacheConfig to.DisableV1SchemaAPI = c.DisableV1SchemaAPI @@ -275,6 +276,13 @@ func WithDispatcher(dispatcher dispatch.Dispatcher) ConfigOption { } } +// WithDispatchHashringReplicationFactor returns an option that can set DispatchHashringReplicationFactor on a Config +func WithDispatchHashringReplicationFactor(dispatchHashringReplicationFactor uint16) ConfigOption { + return func(c *Config) { + c.DispatchHashringReplicationFactor = dispatchHashringReplicationFactor + } +} + // WithDispatchCacheConfig returns an option that can set DispatchCacheConfig on a Config func WithDispatchCacheConfig(dispatchCacheConfig CacheConfig) ConfigOption { return func(c *Config) {