diff --git a/.golangci.yml b/.golangci.yml index 5cb97c4887..8ecd89e592 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -212,6 +212,9 @@ linters: # We allow cuddling assignment following conditions because there are valid # logical groupings for this use-case (e.g. when evaluating config values). - wsl + # New line required before return would require a large fraction of the + # code base to need updating, it's not worth the perceived benefit. + - nlreturn disable-all: false presets: # bodyclose, errcheck, gosec, govet, scopelint, staticcheck, typecheck diff --git a/src/cluster/client/etcd/client.go b/src/cluster/client/etcd/client.go index c3218c2edf..88f85e8dee 100644 --- a/src/cluster/client/etcd/client.go +++ b/src/cluster/client/etcd/client.go @@ -39,8 +39,8 @@ import ( "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/retry" - "go.etcd.io/etcd/clientv3" "github.com/uber-go/tally" + "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -282,9 +282,10 @@ func newClient(cluster Cluster) (*clientv3.Client, error) { return nil, err } cfg := clientv3.Config{ + AutoSyncInterval: cluster.AutoSyncInterval(), + DialTimeout: cluster.DialTimeout(), Endpoints: cluster.Endpoints(), TLS: tls, - AutoSyncInterval: cluster.AutoSyncInterval(), } if opts := cluster.KeepAliveOptions(); opts.KeepAliveEnabled() { @@ -296,6 +297,7 @@ func newClient(cluster Cluster) (*clientv3.Client, error) { } cfg.DialKeepAliveTime = keepAlivePeriod cfg.DialKeepAliveTimeout = opts.KeepAliveTimeout() + cfg.PermitWithoutStream = true } return clientv3.New(cfg) diff --git a/src/cluster/client/etcd/config_test.go b/src/cluster/client/etcd/config_test.go index 62c6af66eb..f2c5bb2f5a 100644 --- a/src/cluster/client/etcd/config_test.go +++ b/src/cluster/client/etcd/config_test.go @@ -138,9 +138,9 @@ m3sd: require.True(t, exists) keepAliveOpts = cluster2.KeepAliveOptions() require.Equal(t, true, keepAliveOpts.KeepAliveEnabled()) - require.Equal(t, 5*time.Minute, keepAliveOpts.KeepAlivePeriod()) - require.Equal(t, 5*time.Minute, keepAliveOpts.KeepAlivePeriodMaxJitter()) - require.Equal(t, 20*time.Second, keepAliveOpts.KeepAliveTimeout()) + require.Equal(t, 20*time.Second, keepAliveOpts.KeepAlivePeriod()) + require.Equal(t, 10*time.Second, keepAliveOpts.KeepAlivePeriodMaxJitter()) + require.Equal(t, 10*time.Second, keepAliveOpts.KeepAliveTimeout()) t.Run("TestOptionsNewDirectoryMode", func(t *testing.T) { opts := cfg.NewOptions() diff --git a/src/cluster/client/etcd/options.go b/src/cluster/client/etcd/options.go index e6e3c719de..566d240e10 100644 --- a/src/cluster/client/etcd/options.go +++ b/src/cluster/client/etcd/options.go @@ -36,10 +36,13 @@ import ( ) const ( + defaultAutoSyncInterval = 1 * time.Minute + defaultDialTimeout = 15 * time.Second + defaultKeepAliveEnabled = true - defaultKeepAlivePeriod = 5 * time.Minute - defaultKeepAlivePeriodMaxJitter = 5 * time.Minute - defaultKeepAliveTimeout = 20 * time.Second + defaultKeepAlivePeriod = 20 * time.Second + defaultKeepAlivePeriodMaxJitter = 10 * time.Second + defaultKeepAliveTimeout = 10 * time.Second defaultRetryInitialBackoff = 2 * time.Second defaultRetryBackoffFactor = 2.0 @@ -316,8 +319,10 @@ func (o options) NewDirectoryMode() os.FileMode { // NewCluster creates a Cluster. func NewCluster() Cluster { return cluster{ - keepAliveOpts: NewKeepAliveOptions(), - tlsOpts: NewTLSOptions(), + autoSyncInterval: defaultAutoSyncInterval, + dialTimeout: defaultDialTimeout, + keepAliveOpts: NewKeepAliveOptions(), + tlsOpts: NewTLSOptions(), } } @@ -327,6 +332,7 @@ type cluster struct { keepAliveOpts KeepAliveOptions tlsOpts TLSOptions autoSyncInterval time.Duration + dialTimeout time.Duration } func (c cluster) Zone() string { @@ -373,3 +379,15 @@ func (c cluster) SetAutoSyncInterval(autoSyncInterval time.Duration) Cluster { c.autoSyncInterval = autoSyncInterval return c } + +//nolint:gocritic +func (c cluster) DialTimeout() time.Duration { + return c.dialTimeout +} + +//nolint:gocritic +func (c cluster) SetDialTimeout(dialTimeout time.Duration) Cluster { + c.dialTimeout = dialTimeout + + return c +} diff --git a/src/cluster/client/etcd/options_test.go b/src/cluster/client/etcd/options_test.go index aee7366fab..befd638960 100644 --- a/src/cluster/client/etcd/options_test.go +++ b/src/cluster/client/etcd/options_test.go @@ -32,16 +32,22 @@ import ( ) func TestKeepAliveOptions(t *testing.T) { - opts := NewKeepAliveOptions(). + opts := NewKeepAliveOptions() + require.Equal(t, defaultKeepAliveEnabled, opts.KeepAliveEnabled()) + require.Equal(t, defaultKeepAlivePeriod, opts.KeepAlivePeriod()) + require.Equal(t, defaultKeepAlivePeriodMaxJitter, opts.KeepAlivePeriodMaxJitter()) + require.Equal(t, defaultKeepAliveTimeout, opts.KeepAliveTimeout()) + + opts = NewKeepAliveOptions(). SetKeepAliveEnabled(true). - SetKeepAlivePeriod(10 * time.Second). - SetKeepAlivePeriodMaxJitter(5 * time.Second). - SetKeepAliveTimeout(time.Second) + SetKeepAlivePeriod(1234 * time.Second). + SetKeepAlivePeriodMaxJitter(5000 * time.Second). + SetKeepAliveTimeout(time.Hour) require.Equal(t, true, opts.KeepAliveEnabled()) - require.Equal(t, 10*time.Second, opts.KeepAlivePeriod()) - require.Equal(t, 5*time.Second, opts.KeepAlivePeriodMaxJitter()) - require.Equal(t, time.Second, opts.KeepAliveTimeout()) + require.Equal(t, 1234*time.Second, opts.KeepAlivePeriod()) + require.Equal(t, 5000*time.Second, opts.KeepAlivePeriodMaxJitter()) + require.Equal(t, time.Hour, opts.KeepAliveTimeout()) } func TestCluster(t *testing.T) { @@ -63,6 +69,22 @@ func TestCluster(t *testing.T) { assert.Equal(t, "z", c.Zone()) assert.Equal(t, []string{"e1"}, c.Endpoints()) assert.Equal(t, aOpts, c.TLSOptions()) + assert.Equal(t, defaultAutoSyncInterval, c.AutoSyncInterval()) + assert.Equal(t, defaultDialTimeout, c.DialTimeout()) + + c = c.SetAutoSyncInterval(123 * time.Minute) + assert.Equal(t, "z", c.Zone()) + assert.Equal(t, []string{"e1"}, c.Endpoints()) + assert.Equal(t, aOpts, c.TLSOptions()) + assert.Equal(t, 123*time.Minute, c.AutoSyncInterval()) + assert.Equal(t, defaultDialTimeout, c.DialTimeout()) + + c = c.SetDialTimeout(42 * time.Hour) + assert.Equal(t, "z", c.Zone()) + assert.Equal(t, []string{"e1"}, c.Endpoints()) + assert.Equal(t, aOpts, c.TLSOptions()) + assert.Equal(t, 123*time.Minute, c.AutoSyncInterval()) + assert.Equal(t, 42*time.Hour, c.DialTimeout()) } func TestTLSOptions(t *testing.T) { diff --git a/src/cluster/client/etcd/types.go b/src/cluster/client/etcd/types.go index ffdc284bac..9c052d1e4f 100644 --- a/src/cluster/client/etcd/types.go +++ b/src/cluster/client/etcd/types.go @@ -129,6 +129,9 @@ type Cluster interface { TLSOptions() TLSOptions SetTLSOptions(TLSOptions) Cluster - SetAutoSyncInterval(value time.Duration) Cluster AutoSyncInterval() time.Duration + SetAutoSyncInterval(value time.Duration) Cluster + + DialTimeout() time.Duration + SetDialTimeout(value time.Duration) Cluster } diff --git a/src/cluster/placement/service/mirrored_custom_groups_test.go b/src/cluster/placement/service/mirrored_custom_groups_test.go index a27c9df514..1d624aaaef 100644 --- a/src/cluster/placement/service/mirrored_custom_groups_test.go +++ b/src/cluster/placement/service/mirrored_custom_groups_test.go @@ -59,7 +59,6 @@ const ( instG3I1 = "g3_i1" instG3I2 = "g3_i2" instG3I3 = "g3_i3" - ) var ( @@ -205,7 +204,7 @@ func mirroredCustomGroupSelectorSetup(t *testing.T) *mirroredCustomGroupSelector tctx.Groups = testGroups opts := placement.NewOptions(). - SetValidZone(zone). + SetValidZone(zone). SetIsMirrored(true) tctx.Selector = selector.NewMirroredCustomGroupSelector( @@ -217,7 +216,7 @@ func mirroredCustomGroupSelectorSetup(t *testing.T) *mirroredCustomGroupSelector tctx.KVStore = mem.NewStore() tctx.Storage = placementstorage.NewPlacementStorage(tctx.KVStore, "placement", tctx.Opts) - tctx.Service = NewPlacementService(tctx.Storage, tctx.Opts) + tctx.Service = NewPlacementService(tctx.Storage, WithPlacementOptions(tctx.Opts)) return tctx } diff --git a/src/cluster/placement/service/operator.go b/src/cluster/placement/service/operator.go index 75da980fc4..5d253969cc 100644 --- a/src/cluster/placement/service/operator.go +++ b/src/cluster/placement/service/operator.go @@ -31,10 +31,10 @@ import ( // given placement. // If initialPlacement is nil, BuildInitialPlacement must be called before any operations on the // placement. -func NewPlacementOperator(initialPlacement placement.Placement, opts placement.Options) placement.Operator { +func NewPlacementOperator(initialPlacement placement.Placement, opts ...Option) placement.Operator { store := newDummyStore(initialPlacement) return &placementOperator{ - placementServiceImpl: newPlacementServiceImpl(opts, store), + placementServiceImpl: newPlacementServiceImpl(store, opts...), store: store, } } @@ -97,4 +97,3 @@ func (d *dummyStore) Placement() (placement.Placement, error) { } return d.curPlacement, nil } - diff --git a/src/cluster/placement/service/operator_test.go b/src/cluster/placement/service/operator_test.go index af3b1b8688..3fcf1132e0 100644 --- a/src/cluster/placement/service/operator_test.go +++ b/src/cluster/placement/service/operator_test.go @@ -25,7 +25,6 @@ import ( "testing" "github.com/m3db/m3/src/cluster/placement" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -33,13 +32,13 @@ import ( func TestOperator(t *testing.T) { type testDeps struct { options placement.Options - op placement.Operator + op placement.Operator } setup := func(t *testing.T) testDeps { options := placement.NewOptions().SetAllowAllZones(true) return testDeps{ options: options, - op: NewPlacementOperator(nil, options), + op: NewPlacementOperator(nil, WithPlacementOptions(options)), } } @@ -60,7 +59,7 @@ func TestOperator(t *testing.T) { t.Run("end-to-end flow", func(t *testing.T) { tdeps := setup(t) - op := NewPlacementOperator(nil, tdeps.options) + op := NewPlacementOperator(nil, WithPlacementOptions(tdeps.options)) store := newMockStorage() pl, err := op.BuildInitialPlacement([]placement.Instance{newTestInstance()}, 10, 1) @@ -81,7 +80,7 @@ func TestOperator(t *testing.T) { require.NoError(t, err) // expect exactly one version increment, from store.SetIfNotExist - assert.Equal(t, initialVersion + 1, pl.Version()) + assert.Equal(t, initialVersion+1, pl.Version()) // spot check the results allAvailable := true @@ -94,15 +93,15 @@ func TestOperator(t *testing.T) { }) } -type dummyStoreTestDeps struct{ +type dummyStoreTestDeps struct { store *dummyStore - pl placement.Placement + pl placement.Placement } func dummyStoreSetup(t *testing.T) dummyStoreTestDeps { return dummyStoreTestDeps{ store: newDummyStore(nil), - pl: placement.NewPlacement(), + pl: placement.NewPlacement(), } } diff --git a/src/cluster/placement/service/service.go b/src/cluster/placement/service/service.go index 67873db30f..2ee4331c9c 100644 --- a/src/cluster/placement/service/service.go +++ b/src/cluster/placement/service/service.go @@ -27,7 +27,6 @@ import ( "github.com/m3db/m3/src/cluster/placement/algo" "github.com/m3db/m3/src/cluster/placement/selector" "github.com/m3db/m3/src/cluster/shard" - "go.uber.org/zap" ) @@ -37,36 +36,79 @@ type placementService struct { } // NewPlacementService returns an instance of placement service. -func NewPlacementService(s placement.Storage, opts placement.Options) placement.Service { +func NewPlacementService(s placement.Storage, opts ...Option) placement.Service { return &placementService{ Storage: s, placementServiceImpl: newPlacementServiceImpl( - opts, s, - + opts..., ), } } +type options struct { + placementAlgorithm placement.Algorithm + placementOpts placement.Options +} + +// Option is an interface for PlacementService options. +type Option interface { + apply(*options) +} + +// WithAlgorithm sets the algorithm implementation that will be used by PlacementService. +func WithAlgorithm(algo placement.Algorithm) Option { + return &algorithmOption{placementAlgorithm: algo} +} + +type algorithmOption struct { + placementAlgorithm placement.Algorithm +} + +func (a *algorithmOption) apply(opts *options) { + opts.placementAlgorithm = a.placementAlgorithm +} + +type placementOptionsOption struct { + opts placement.Options +} + +func (a *placementOptionsOption) apply(opts *options) { + opts.placementOpts = a.opts +} + +// WithPlacementOptions sets the placement options for PlacementService. +func WithPlacementOptions(opts placement.Options) Option { + return &placementOptionsOption{opts: opts} +} + func newPlacementServiceImpl( - opts placement.Options, storage minimalPlacementStorage, + opts ...Option, ) *placementServiceImpl { - if opts == nil { - opts = placement.NewOptions() + o := options{ + placementOpts: placement.NewOptions(), + } + + for _, opt := range opts { + opt.apply(&o) + } + + if o.placementAlgorithm == nil { + o.placementAlgorithm = algo.NewAlgorithm(o.placementOpts) } - instanceSelector := opts.InstanceSelector() + instanceSelector := o.placementOpts.InstanceSelector() if instanceSelector == nil { - instanceSelector = selector.NewInstanceSelector(opts) + instanceSelector = selector.NewInstanceSelector(o.placementOpts) } return &placementServiceImpl{ store: storage, - opts: opts, - algo: algo.NewAlgorithm(opts), + opts: o.placementOpts, + algo: o.placementAlgorithm, selector: instanceSelector, - logger: opts.InstrumentOptions().Logger(), + logger: o.placementOpts.InstrumentOptions().Logger(), } } diff --git a/src/cluster/placement/service/service_test.go b/src/cluster/placement/service/service_test.go index 5ebe2b49db..344454df28 100644 --- a/src/cluster/placement/service/service_test.go +++ b/src/cluster/placement/service/service_test.go @@ -26,15 +26,16 @@ import ( "github.com/m3db/m3/src/cluster/kv/mem" "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/cluster/placement/algo" "github.com/m3db/m3/src/cluster/placement/storage" "github.com/m3db/m3/src/cluster/shard" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestGoodWorkflow(t *testing.T) { - p := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) testGoodWorkflow(t, p) } @@ -145,7 +146,8 @@ func assertPlacementInstanceEqualExceptShards( } func TestNonShardedWorkflow(t *testing.T) { - ps := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1").SetIsSharded(false)) + ps := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1").SetIsSharded(false))) _, err := ps.BuildInitialPlacement([]placement.Instance{ placement.NewEmptyInstance("i1", "r1", "z1", "e1", 1), @@ -206,7 +208,8 @@ func TestNonShardedWorkflow(t *testing.T) { } func TestBadInitialPlacement(t *testing.T) { - p := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1").SetIsSharded(false)) + p := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1").SetIsSharded(false))) // invalid numShards _, err := p.BuildInitialPlacement([]placement.Instance{ @@ -229,7 +232,8 @@ func TestBadInitialPlacement(t *testing.T) { }, 10, 1) assert.Error(t, err) - p = NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p = NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) // Not enough instances. _, err = p.BuildInitialPlacement([]placement.Instance{}, 10, 1) @@ -264,7 +268,8 @@ func TestBadInitialPlacement(t *testing.T) { } func TestBadAddReplica(t *testing.T) { - p := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, err := p.BuildInitialPlacement( []placement.Instance{placement.NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)}, @@ -276,14 +281,16 @@ func TestBadAddReplica(t *testing.T) { assert.Error(t, err) // Could not find placement for service. - p = NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p = NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, err = p.AddReplica() assert.Error(t, err) } func TestBadAddInstance(t *testing.T) { ms := newMockStorage() - p := NewPlacementService(ms, placement.NewOptions().SetValidZone("z1")) + p := NewPlacementService(ms, + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, err := p.BuildInitialPlacement( []placement.Instance{placement.NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)}, @@ -298,18 +305,21 @@ func TestBadAddInstance(t *testing.T) { _, _, err = p.AddInstances([]placement.Instance{placement.NewEmptyInstance("i2", "r2", "z2", "endpoint", 1)}) assert.Error(t, err) - p = NewPlacementService(ms, placement.NewOptions().SetValidZone("z1")) + p = NewPlacementService(ms, + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, _, err = p.AddInstances([]placement.Instance{placement.NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)}) assert.Error(t, err) // could not find placement for service - p = NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p = NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, _, err = p.AddInstances([]placement.Instance{placement.NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)}) assert.Error(t, err) } func TestBadRemoveInstance(t *testing.T) { - p := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, err := p.BuildInitialPlacement( []placement.Instance{placement.NewEmptyInstance("i1", "r1", "z1", "endpoint", 1)}, @@ -325,13 +335,15 @@ func TestBadRemoveInstance(t *testing.T) { assert.Error(t, err) // Could not find placement for service. - p = NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p = NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, err = p.RemoveInstances([]string{"i1"}) assert.Error(t, err) } func TestBadReplaceInstance(t *testing.T) { - p := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, err := p.BuildInitialPlacement([]placement.Instance{ placement.NewEmptyInstance("i1", "r1", "z1", "endpoint", 1), @@ -363,7 +375,8 @@ func TestBadReplaceInstance(t *testing.T) { assert.Error(t, err) // Could not find placement for service. - p = NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p = NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, _, err = p.ReplaceInstances( []string{"i1"}, []placement.Instance{placement.NewEmptyInstance("i2", "r2", "z1", "endpoint", 1)}, @@ -406,7 +419,8 @@ func TestMarkShard(t *testing.T) { _, err := ms.SetIfNotExist(p) assert.NoError(t, err) - ps := NewPlacementService(ms, placement.NewOptions().SetValidZone("z1")) + ps := NewPlacementService(ms, + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) _, err = ps.MarkShardsAvailable("i5", 1) assert.NoError(t, err) p, err = ms.Placement() @@ -461,7 +475,7 @@ func TestMarkInstance(t *testing.T) { _, err := ms.SetIfNotExist(p) assert.NoError(t, err) - ps := NewPlacementService(ms, placement.NewOptions().SetValidZone("z1")) + ps := NewPlacementService(ms, WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) // instance not exist _, err = ps.MarkInstanceAvailable("i6") @@ -553,7 +567,7 @@ func TestFindReplaceInstance(t *testing.T) { }, } for _, test := range testCases { - p := NewPlacementService(nil, test.opts).(*placementService) + p := NewPlacementService(nil, WithPlacementOptions(test.opts)).(*placementService) res, err := p.selector.SelectReplaceInstances(test.input, test.replaceIDs, s) if test.expectErr { assert.Error(t, err) @@ -663,7 +677,7 @@ func TestMirrorWorkflow(t *testing.T) { ps := NewPlacementService( newMockStorage(), - placement.NewOptions().SetValidZone("z1").SetIsMirrored(true), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1").SetIsMirrored(true)), ) p, err := ps.BuildInitialPlacement( @@ -758,7 +772,8 @@ func TestMirrorWorkflow(t *testing.T) { } func TestManyShards(t *testing.T) { - p := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")) + p := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))) i1 := placement.NewEmptyInstance("i1", "r1", "z1", "endpoint", 2) i2 := placement.NewEmptyInstance("i2", "r2", "z1", "endpoint", 2) i3 := placement.NewEmptyInstance("i3", "r3", "z1", "endpoint", 2) @@ -816,7 +831,7 @@ func TestAddMultipleInstances(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ps := NewPlacementService(newMockStorage(), test.opts) + ps := NewPlacementService(newMockStorage(), WithPlacementOptions(test.opts)) _, err := ps.BuildInitialPlacement(test.initialInstances, 4, 2) require.NoError(t, err) @@ -897,7 +912,7 @@ func TestReplaceInstances(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ps := NewPlacementService(newMockStorage(), test.opts) + ps := NewPlacementService(newMockStorage(), WithPlacementOptions(test.opts)) _, err := ps.BuildInitialPlacement(test.initialInstances, 4, 2) require.NoError(t, err) @@ -913,7 +928,8 @@ func TestReplaceInstances(t *testing.T) { } func TestValidateFnBeforeUpdate(t *testing.T) { - p := NewPlacementService(newMockStorage(), placement.NewOptions().SetValidZone("z1")).(*placementService) + p := NewPlacementService(newMockStorage(), + WithPlacementOptions(placement.NewOptions().SetValidZone("z1"))).(*placementService) _, err := p.BuildInitialPlacement( []placement.Instance{placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1)}, @@ -927,6 +943,23 @@ func TestValidateFnBeforeUpdate(t *testing.T) { assert.Equal(t, expectErr, err) } +func TestPlacementServiceImplOptions(t *testing.T) { + placementOptions := placement.NewOptions().SetValidZone("foozone").SetIsSharded(true) + al := algo.NewAlgorithm(placementOptions.SetIsSharded(false)) + + defaultImpl := newPlacementServiceImpl(nil) + require.NotNil(t, defaultImpl) + assert.NotNil(t, defaultImpl.opts) + assert.NotNil(t, defaultImpl.algo) + assert.NotEqual(t, placementOptions.ValidZone(), defaultImpl.opts.ValidZone()) + + customImpl := newPlacementServiceImpl(nil, + WithPlacementOptions(placementOptions), + WithAlgorithm(al)) + assert.Equal(t, placementOptions.ValidZone(), customImpl.opts.ValidZone()) + assert.Equal(t, al, customImpl.algo) +} + func newMockStorage() placement.Storage { return storage.NewPlacementStorage(mem.NewStore(), "", nil) } diff --git a/src/cluster/services/services.go b/src/cluster/services/services.go index 2bf03263cb..7ef98aec7c 100644 --- a/src/cluster/services/services.go +++ b/src/cluster/services/services.go @@ -154,7 +154,7 @@ func (c *client) PlacementService(sid ServiceID, opts placement.Options) (placem return ps.NewPlacementService( storage.NewPlacementStorage(store, c.placementKeyFn(sid), opts), - opts, + ps.WithPlacementOptions(opts), ), nil } diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 2089552c01..b1b61df731 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -1052,7 +1052,7 @@ func (o DownsamplerOptions) newAggregatorPlacementManager( placementSvc := placementservice.NewPlacementService( placementstorage.NewPlacementStorage(localKVStore, placementKVKey, placementOpts), - placementOpts) + placementservice.WithPlacementOptions(placementOpts)) _, err := placementSvc.BuildInitialPlacement([]placement.Instance{instance}, numShards, replicationFactor) diff --git a/src/msg/integration/setup.go b/src/msg/integration/setup.go index d797077021..0c377a2f46 100644 --- a/src/msg/integration/setup.go +++ b/src/msg/integration/setup.go @@ -536,7 +536,10 @@ func (c *testConsumer) consumeAndAck(totalConsumed *atomic.Int64) { func testPlacementService(store kv.Store, sid services.ServiceID, isSharded bool) placement.Service { opts := placement.NewOptions().SetShardStateMode(placement.StableShardStateOnly).SetIsSharded(isSharded) - return service.NewPlacementService(storage.NewPlacementStorage(store, sid.String(), opts), opts) + + return service.NewPlacementService( + storage.NewPlacementStorage(store, sid.String(), opts), + service.WithPlacementOptions(opts)) } func testProducer( diff --git a/src/msg/producer/writer/consumer_service_writer_test.go b/src/msg/producer/writer/consumer_service_writer_test.go index f9d66e02a1..f88804e3e7 100644 --- a/src/msg/producer/writer/consumer_service_writer_test.go +++ b/src/msg/producer/writer/consumer_service_writer_test.go @@ -608,7 +608,8 @@ func TestConsumerServiceWriterUpdateNonShardedPlacementWithReplicatedConsumption cs := topic.NewConsumerService().SetServiceID(sid).SetConsumptionType(topic.Replicated) sd := services.NewMockServices(ctrl) pOpts := placement.NewOptions().SetIsSharded(false) - ps := service.NewPlacementService(storage.NewPlacementStorage(mem.NewStore(), sid.String(), pOpts), pOpts) + ps := service.NewPlacementService(storage.NewPlacementStorage(mem.NewStore(), sid.String(), pOpts), + service.WithPlacementOptions(pOpts)) sd.EXPECT().PlacementService(sid, gomock.Any()).Return(ps, nil) _, err := ps.BuildInitialPlacement([]placement.Instance{ placement.NewInstance().SetID("i1").SetEndpoint("i1").SetWeight(1), @@ -668,5 +669,7 @@ func TestConsumerServiceCloseShardWritersConcurrently(t *testing.T) { } func testPlacementService(store kv.Store, sid services.ServiceID) placement.Service { - return service.NewPlacementService(storage.NewPlacementStorage(store, sid.String(), placement.NewOptions()), placement.NewOptions()) + return service.NewPlacementService( + storage.NewPlacementStorage(store, sid.String(), placement.NewOptions()), + ) } diff --git a/src/query/api/v1/handler/placement/get_test.go b/src/query/api/v1/handler/placement/get_test.go index 40edc98375..fdae937481 100644 --- a/src/query/api/v1/handler/placement/get_test.go +++ b/src/query/api/v1/handler/placement/get_test.go @@ -70,7 +70,9 @@ func setupPlacementTest(t *testing.T, ctrl *gomock.Controller, initPlacement pla mockClient.EXPECT().Services(gomock.Any()).Return(mockServices, nil).AnyTimes() mockServices.EXPECT().PlacementService(gomock.Any(), gomock.Any()).DoAndReturn( func(_ interface{}, opts placement.Options) (placement.Service, error) { - ps := service.NewPlacementService(storage.NewPlacementStorage(mem.NewStore(), "", opts), opts) + ps := service.NewPlacementService( + storage.NewPlacementStorage(mem.NewStore(), "", opts), + service.WithPlacementOptions(opts)) if initPlacement != nil { _, err := ps.Set(initPlacement) require.NoError(t, err)