Skip to content

Commit

Permalink
cmd/server: configure hashring replication factor using a singleton p…
Browse files Browse the repository at this point in the history
…icker
  • Loading branch information
jakedt committed Apr 3, 2023
1 parent abdf7bd commit 882427b
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 33 deletions.
12 changes: 1 addition & 11 deletions cmd/spicedb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"os"

"github.com/cespare/xxhash/v2"
"github.com/rs/zerolog"
"github.com/sercand/kuberesolver/v3"
"github.com/spf13/cobra"
Expand All @@ -18,23 +17,14 @@ import (
"github.com/authzed/spicedb/pkg/cmd/testserver"
)

const (
hashringReplicationFactor = 20
backendsPerKey = 1
)

var errParsing = errors.New("parsing error")

func main() {
// Enable Kubernetes gRPC resolver
kuberesolver.RegisterInCluster()

// Enable consistent hashring gRPC load balancer
balancer.Register(consistentbalancer.NewConsistentHashringBuilder(
xxhash.Sum64,
hashringReplicationFactor,
backendsPerKey,
))
balancer.Register(consistentbalancer.NewConsistentHashringBuilder(cmdutil.ConsistentHashringPicker))

log.SetGlobalLogger(zerolog.New(os.Stdout))

Expand Down
2 changes: 1 addition & 1 deletion internal/services/integrationtesting/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func BenchmarkServices(b *testing.B) {
})
brequire.NoError(err)

conn, cleanup := testserver.TestClusterWithDispatchAndCacheConfig(b, 1, ds, false /* no cache */)
conn, cleanup := testserver.TestClusterWithDispatchAndCacheConfig(b, 1, ds)
b.Cleanup(cleanup)

dsCtx := datastoremw.ContextWithHandle(context.Background())
Expand Down
10 changes: 6 additions & 4 deletions internal/testserver/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ var testResolverBuilder = &SafeManualResolverBuilder{}

func init() {
// register hashring balancer
balancer.Register(hashbalancer.NewConsistentHashringBuilder(xxhash.Sum64, 20, 1))
balancer.Register(hashbalancer.NewConsistentHashringBuilder(
hashbalancer.NewConsistentHashringPickerBuilder(xxhash.Sum64, 1500, 1)),
)

// Register a manual resolver.Builder that we can feed addresses for tests
// Registration is not thread safe, so we register a single resolver.Builder
Expand Down Expand Up @@ -124,7 +126,7 @@ type SafeManualResolver struct {

// ResolveNow implements the resolver.Resolver interface
// It sends the static list of addresses to the underlying resolver.ClientConn
func (r *SafeManualResolver) ResolveNow(options resolver.ResolveNowOptions) {
func (r *SafeManualResolver) ResolveNow(_ resolver.ResolveNowOptions) {
if r.cc == nil {
return
}
Expand All @@ -139,11 +141,11 @@ func (r *SafeManualResolver) Close() {}
// TestClusterWithDispatch creates a cluster with `size` nodes
// The cluster has a real dispatch stack that uses bufconn grpc connections
func TestClusterWithDispatch(t testing.TB, size uint, ds datastore.Datastore) ([]*grpc.ClientConn, func()) {
return TestClusterWithDispatchAndCacheConfig(t, size, ds, true)
return TestClusterWithDispatchAndCacheConfig(t, size, ds)
}

// TestClusterWithDispatchAndCacheConfig creates a cluster with `size` nodes and with cache toggled.
func TestClusterWithDispatchAndCacheConfig(t testing.TB, size uint, ds datastore.Datastore, cacheEnabled bool) ([]*grpc.ClientConn, func()) {
func TestClusterWithDispatchAndCacheConfig(t testing.TB, size uint, ds datastore.Datastore) ([]*grpc.ClientConn, func()) {
// each cluster gets a unique prefix since grpc resolution is process-global
prefix := getPrefix(t)

Expand Down
41 changes: 36 additions & 5 deletions pkg/balancer/hashring.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,33 @@ const (
var logger = grpclog.Component("consistenthashring")

// NewConsistentHashringBuilder creates a new balancer.Builder that
// will create a consistent hashring balancer with the given config.
// will create a consistent hashring balancer with the picker builder.
// Before making a connection, register it with grpc with:
// `balancer.Register(consistent.NewConsistentHashringBuilder(hasher, factor, spread))`
func NewConsistentHashringBuilder(hasher consistent.HasherFunc, replicationFactor uint16, spread uint8) balancer.Builder {
func NewConsistentHashringBuilder(pickerBuilder base.PickerBuilder) balancer.Builder {
return base.NewBalancerBuilder(
BalancerName,
&consistentHashringPickerBuilder{hasher: hasher, replicationFactor: replicationFactor, spread: spread},
pickerBuilder,
base.Config{HealthCheck: true},
)
}

// NewConsistentHashringPickerBuilder creates a new picker builder
// that will create consistent hashrings according to the supplied
// config. If the ReplicationFactor is changed, that new parameter
// will be used when the next picker is created.
func NewConsistentHashringPickerBuilder(
hasher consistent.HasherFunc,
initialReplicationFactor uint16,
spread uint8,
) *ConsistentHashringPickerBuilder {
return &ConsistentHashringPickerBuilder{
hasher: hasher,
replicationFactor: initialReplicationFactor,
spread: spread,
}
}

type subConnMember struct {
balancer.SubConn
key string
Expand All @@ -54,19 +70,32 @@ func (s subConnMember) Key() string {

var _ consistent.Member = &subConnMember{}

type consistentHashringPickerBuilder struct {
// ConsistentHashringPickerBuilder is an implementation of base.PickerBuilder and
// is used to build pickers based on updates to the node architecture.
type ConsistentHashringPickerBuilder struct {
sync.Mutex

hasher consistent.HasherFunc
replicationFactor uint16
spread uint8
}

func (b *consistentHashringPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
func (b *ConsistentHashringPickerBuilder) ReplicationFactor(rf uint16) {
b.Lock()
defer b.Unlock()
b.replicationFactor = rf
}

func (b *ConsistentHashringPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
logger.Infof("consistentHashringPicker: Build called with info: %v", info)
if len(info.ReadySCs) == 0 {
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}

b.Lock()
hashring := consistent.MustNewHashring(b.hasher, b.replicationFactor)
b.Unlock()

for sc, scInfo := range info.ReadySCs {
if err := hashring.Add(subConnMember{
SubConn: sc,
Expand Down Expand Up @@ -106,3 +135,5 @@ func (p *consistentHashringPicker) Pick(info balancer.PickInfo) (balancer.PickRe
SubConn: chosen.SubConn,
}, nil
}

var _ base.PickerBuilder = &ConsistentHashringPickerBuilder{}
2 changes: 2 additions & 0 deletions pkg/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error {
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.LookupSubjects, "dispatch-lookup-subjects-concurrency-limit", 0, "maximum number of parallel goroutines to create for each lookup subjects request or subrequest. defaults to --dispatch-concurrency-limit")
cmd.Flags().Uint16Var(&config.DispatchConcurrencyLimits.ReachableResources, "dispatch-reachable-resources-concurrency-limit", 0, "maximum number of parallel goroutines to create for each reachable resources request or subrequest. defaults to --dispatch-concurrency-limit")

cmd.Flags().Uint16Var(&config.DispatchHashringReplicationFactor, "dispatch-hashring-replication-factor", 100, "set the replication factor of the consistent hasher used for the dispatcher")

// Flags for configuring API behavior
cmd.Flags().BoolVar(&config.DisableV1SchemaAPI, "disable-v1-schema-api", false, "disables the V1 schema API")
cmd.Flags().BoolVar(&config.DisableVersionResponse, "disable-version-response", false, "disables version response support in the API")
Expand Down
41 changes: 29 additions & 12 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/authzed/grpcutil"
"github.com/cespare/xxhash/v2"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/auth"
grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/hashicorp/go-multierror"
Expand Down Expand Up @@ -40,6 +41,17 @@ import (
"github.com/authzed/spicedb/pkg/datastore"
)

const (
hashringReplicationFactor = 100
backendsPerKey = 1
)

var ConsistentHashringPicker = balancer.NewConsistentHashringPickerBuilder(
xxhash.Sum64,
hashringReplicationFactor,
backendsPerKey,
)

//go:generate go run github.com/ecordell/optgen -output zz_generated.options.go . Config
type Config struct {
// API config
Expand Down Expand Up @@ -67,18 +79,19 @@ type Config struct {
SchemaPrefixesRequired bool

// Dispatch options
DispatchServer util.GRPCServerConfig
DispatchMaxDepth uint32
GlobalDispatchConcurrencyLimit uint16
DispatchConcurrencyLimits graph.ConcurrencyLimits
DispatchUpstreamAddr string
DispatchUpstreamCAPath string
DispatchUpstreamTimeout time.Duration
DispatchClientMetricsEnabled bool
DispatchClientMetricsPrefix string
DispatchClusterMetricsEnabled bool
DispatchClusterMetricsPrefix string
Dispatcher dispatch.Dispatcher
DispatchServer util.GRPCServerConfig
DispatchMaxDepth uint32
GlobalDispatchConcurrencyLimit uint16
DispatchConcurrencyLimits graph.ConcurrencyLimits
DispatchUpstreamAddr string
DispatchUpstreamCAPath string
DispatchUpstreamTimeout time.Duration
DispatchClientMetricsEnabled bool
DispatchClientMetricsPrefix string
DispatchClusterMetricsEnabled bool
DispatchClusterMetricsPrefix string
Dispatcher dispatch.Dispatcher
DispatchHashringReplicationFactor uint16

DispatchCacheConfig CacheConfig
ClusterDispatchCacheConfig CacheConfig
Expand Down Expand Up @@ -238,6 +251,10 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
}
closeables.AddWithError(dispatcher.Close)

// Set this value to take effect the next time the replicas are updated
// Applies to ALL running servers.
ConsistentHashringPicker.ReplicationFactor(c.DispatchHashringReplicationFactor)

if len(c.DispatchUnaryMiddleware) == 0 && len(c.DispatchStreamingMiddleware) == 0 {
if c.GRPCAuthFunc == nil {
c.DispatchUnaryMiddleware, c.DispatchStreamingMiddleware = DefaultDispatchMiddleware(log.Logger, auth.MustRequirePresharedKey(c.PresharedKey), ds)
Expand Down
8 changes: 8 additions & 0 deletions pkg/cmd/server/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 882427b

Please sign in to comment.