Skip to content

Commit

Permalink
Merge branch 'master' into wesley-lockfile
Browse files Browse the repository at this point in the history
  • Loading branch information
wesleyk authored Nov 9, 2020
2 parents eb1d9a1 + aa361f4 commit 0210abc
Show file tree
Hide file tree
Showing 16 changed files with 200 additions and 72 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ linters:
# We allow cuddling assignment following conditions because there are valid
# logical groupings for this use-case (e.g. when evaluating config values).
- wsl
# New line required before return would require a large fraction of the
# code base to need updating, it's not worth the perceived benefit.
- nlreturn
disable-all: false
presets:
# bodyclose, errcheck, gosec, govet, scopelint, staticcheck, typecheck
Expand Down
6 changes: 4 additions & 2 deletions src/cluster/client/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ import (
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/retry"

"go.etcd.io/etcd/clientv3"
"github.com/uber-go/tally"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -282,9 +282,10 @@ func newClient(cluster Cluster) (*clientv3.Client, error) {
return nil, err
}
cfg := clientv3.Config{
AutoSyncInterval: cluster.AutoSyncInterval(),
DialTimeout: cluster.DialTimeout(),
Endpoints: cluster.Endpoints(),
TLS: tls,
AutoSyncInterval: cluster.AutoSyncInterval(),
}

if opts := cluster.KeepAliveOptions(); opts.KeepAliveEnabled() {
Expand All @@ -296,6 +297,7 @@ func newClient(cluster Cluster) (*clientv3.Client, error) {
}
cfg.DialKeepAliveTime = keepAlivePeriod
cfg.DialKeepAliveTimeout = opts.KeepAliveTimeout()
cfg.PermitWithoutStream = true
}

return clientv3.New(cfg)
Expand Down
6 changes: 3 additions & 3 deletions src/cluster/client/etcd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ m3sd:
require.True(t, exists)
keepAliveOpts = cluster2.KeepAliveOptions()
require.Equal(t, true, keepAliveOpts.KeepAliveEnabled())
require.Equal(t, 5*time.Minute, keepAliveOpts.KeepAlivePeriod())
require.Equal(t, 5*time.Minute, keepAliveOpts.KeepAlivePeriodMaxJitter())
require.Equal(t, 20*time.Second, keepAliveOpts.KeepAliveTimeout())
require.Equal(t, 20*time.Second, keepAliveOpts.KeepAlivePeriod())
require.Equal(t, 10*time.Second, keepAliveOpts.KeepAlivePeriodMaxJitter())
require.Equal(t, 10*time.Second, keepAliveOpts.KeepAliveTimeout())

t.Run("TestOptionsNewDirectoryMode", func(t *testing.T) {
opts := cfg.NewOptions()
Expand Down
28 changes: 23 additions & 5 deletions src/cluster/client/etcd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@ import (
)

const (
defaultAutoSyncInterval = 1 * time.Minute
defaultDialTimeout = 15 * time.Second

defaultKeepAliveEnabled = true
defaultKeepAlivePeriod = 5 * time.Minute
defaultKeepAlivePeriodMaxJitter = 5 * time.Minute
defaultKeepAliveTimeout = 20 * time.Second
defaultKeepAlivePeriod = 20 * time.Second
defaultKeepAlivePeriodMaxJitter = 10 * time.Second
defaultKeepAliveTimeout = 10 * time.Second

defaultRetryInitialBackoff = 2 * time.Second
defaultRetryBackoffFactor = 2.0
Expand Down Expand Up @@ -316,8 +319,10 @@ func (o options) NewDirectoryMode() os.FileMode {
// NewCluster creates a Cluster.
func NewCluster() Cluster {
return cluster{
keepAliveOpts: NewKeepAliveOptions(),
tlsOpts: NewTLSOptions(),
autoSyncInterval: defaultAutoSyncInterval,
dialTimeout: defaultDialTimeout,
keepAliveOpts: NewKeepAliveOptions(),
tlsOpts: NewTLSOptions(),
}
}

Expand All @@ -327,6 +332,7 @@ type cluster struct {
keepAliveOpts KeepAliveOptions
tlsOpts TLSOptions
autoSyncInterval time.Duration
dialTimeout time.Duration
}

func (c cluster) Zone() string {
Expand Down Expand Up @@ -373,3 +379,15 @@ func (c cluster) SetAutoSyncInterval(autoSyncInterval time.Duration) Cluster {
c.autoSyncInterval = autoSyncInterval
return c
}

//nolint:gocritic
func (c cluster) DialTimeout() time.Duration {
return c.dialTimeout
}

//nolint:gocritic
func (c cluster) SetDialTimeout(dialTimeout time.Duration) Cluster {
c.dialTimeout = dialTimeout

return c
}
36 changes: 29 additions & 7 deletions src/cluster/client/etcd/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ import (
)

func TestKeepAliveOptions(t *testing.T) {
opts := NewKeepAliveOptions().
opts := NewKeepAliveOptions()
require.Equal(t, defaultKeepAliveEnabled, opts.KeepAliveEnabled())
require.Equal(t, defaultKeepAlivePeriod, opts.KeepAlivePeriod())
require.Equal(t, defaultKeepAlivePeriodMaxJitter, opts.KeepAlivePeriodMaxJitter())
require.Equal(t, defaultKeepAliveTimeout, opts.KeepAliveTimeout())

opts = NewKeepAliveOptions().
SetKeepAliveEnabled(true).
SetKeepAlivePeriod(10 * time.Second).
SetKeepAlivePeriodMaxJitter(5 * time.Second).
SetKeepAliveTimeout(time.Second)
SetKeepAlivePeriod(1234 * time.Second).
SetKeepAlivePeriodMaxJitter(5000 * time.Second).
SetKeepAliveTimeout(time.Hour)

require.Equal(t, true, opts.KeepAliveEnabled())
require.Equal(t, 10*time.Second, opts.KeepAlivePeriod())
require.Equal(t, 5*time.Second, opts.KeepAlivePeriodMaxJitter())
require.Equal(t, time.Second, opts.KeepAliveTimeout())
require.Equal(t, 1234*time.Second, opts.KeepAlivePeriod())
require.Equal(t, 5000*time.Second, opts.KeepAlivePeriodMaxJitter())
require.Equal(t, time.Hour, opts.KeepAliveTimeout())
}

func TestCluster(t *testing.T) {
Expand All @@ -63,6 +69,22 @@ func TestCluster(t *testing.T) {
assert.Equal(t, "z", c.Zone())
assert.Equal(t, []string{"e1"}, c.Endpoints())
assert.Equal(t, aOpts, c.TLSOptions())
assert.Equal(t, defaultAutoSyncInterval, c.AutoSyncInterval())
assert.Equal(t, defaultDialTimeout, c.DialTimeout())

c = c.SetAutoSyncInterval(123 * time.Minute)
assert.Equal(t, "z", c.Zone())
assert.Equal(t, []string{"e1"}, c.Endpoints())
assert.Equal(t, aOpts, c.TLSOptions())
assert.Equal(t, 123*time.Minute, c.AutoSyncInterval())
assert.Equal(t, defaultDialTimeout, c.DialTimeout())

c = c.SetDialTimeout(42 * time.Hour)
assert.Equal(t, "z", c.Zone())
assert.Equal(t, []string{"e1"}, c.Endpoints())
assert.Equal(t, aOpts, c.TLSOptions())
assert.Equal(t, 123*time.Minute, c.AutoSyncInterval())
assert.Equal(t, 42*time.Hour, c.DialTimeout())
}

func TestTLSOptions(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion src/cluster/client/etcd/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ type Cluster interface {
TLSOptions() TLSOptions
SetTLSOptions(TLSOptions) Cluster

SetAutoSyncInterval(value time.Duration) Cluster
AutoSyncInterval() time.Duration
SetAutoSyncInterval(value time.Duration) Cluster

DialTimeout() time.Duration
SetDialTimeout(value time.Duration) Cluster
}
5 changes: 2 additions & 3 deletions src/cluster/placement/service/mirrored_custom_groups_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ const (
instG3I1 = "g3_i1"
instG3I2 = "g3_i2"
instG3I3 = "g3_i3"

)

var (
Expand Down Expand Up @@ -205,7 +204,7 @@ func mirroredCustomGroupSelectorSetup(t *testing.T) *mirroredCustomGroupSelector
tctx.Groups = testGroups

opts := placement.NewOptions().
SetValidZone(zone).
SetValidZone(zone).
SetIsMirrored(true)

tctx.Selector = selector.NewMirroredCustomGroupSelector(
Expand All @@ -217,7 +216,7 @@ func mirroredCustomGroupSelectorSetup(t *testing.T) *mirroredCustomGroupSelector

tctx.KVStore = mem.NewStore()
tctx.Storage = placementstorage.NewPlacementStorage(tctx.KVStore, "placement", tctx.Opts)
tctx.Service = NewPlacementService(tctx.Storage, tctx.Opts)
tctx.Service = NewPlacementService(tctx.Storage, WithPlacementOptions(tctx.Opts))
return tctx
}

Expand Down
5 changes: 2 additions & 3 deletions src/cluster/placement/service/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import (
// given placement.
// If initialPlacement is nil, BuildInitialPlacement must be called before any operations on the
// placement.
func NewPlacementOperator(initialPlacement placement.Placement, opts placement.Options) placement.Operator {
func NewPlacementOperator(initialPlacement placement.Placement, opts ...Option) placement.Operator {
store := newDummyStore(initialPlacement)
return &placementOperator{
placementServiceImpl: newPlacementServiceImpl(opts, store),
placementServiceImpl: newPlacementServiceImpl(store, opts...),
store: store,
}
}
Expand Down Expand Up @@ -97,4 +97,3 @@ func (d *dummyStore) Placement() (placement.Placement, error) {
}
return d.curPlacement, nil
}

15 changes: 7 additions & 8 deletions src/cluster/placement/service/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@ import (
"testing"

"github.com/m3db/m3/src/cluster/placement"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOperator(t *testing.T) {
type testDeps struct {
options placement.Options
op placement.Operator
op placement.Operator
}
setup := func(t *testing.T) testDeps {
options := placement.NewOptions().SetAllowAllZones(true)
return testDeps{
options: options,
op: NewPlacementOperator(nil, options),
op: NewPlacementOperator(nil, WithPlacementOptions(options)),
}
}

Expand All @@ -60,7 +59,7 @@ func TestOperator(t *testing.T) {

t.Run("end-to-end flow", func(t *testing.T) {
tdeps := setup(t)
op := NewPlacementOperator(nil, tdeps.options)
op := NewPlacementOperator(nil, WithPlacementOptions(tdeps.options))
store := newMockStorage()

pl, err := op.BuildInitialPlacement([]placement.Instance{newTestInstance()}, 10, 1)
Expand All @@ -81,7 +80,7 @@ func TestOperator(t *testing.T) {
require.NoError(t, err)

// expect exactly one version increment, from store.SetIfNotExist
assert.Equal(t, initialVersion + 1, pl.Version())
assert.Equal(t, initialVersion+1, pl.Version())

// spot check the results
allAvailable := true
Expand All @@ -94,15 +93,15 @@ func TestOperator(t *testing.T) {
})
}

type dummyStoreTestDeps struct{
type dummyStoreTestDeps struct {
store *dummyStore
pl placement.Placement
pl placement.Placement
}

func dummyStoreSetup(t *testing.T) dummyStoreTestDeps {
return dummyStoreTestDeps{
store: newDummyStore(nil),
pl: placement.NewPlacement(),
pl: placement.NewPlacement(),
}
}

Expand Down
66 changes: 54 additions & 12 deletions src/cluster/placement/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/m3db/m3/src/cluster/placement/algo"
"github.com/m3db/m3/src/cluster/placement/selector"
"github.com/m3db/m3/src/cluster/shard"

"go.uber.org/zap"
)

Expand All @@ -37,36 +36,79 @@ type placementService struct {
}

// NewPlacementService returns an instance of placement service.
func NewPlacementService(s placement.Storage, opts placement.Options) placement.Service {
func NewPlacementService(s placement.Storage, opts ...Option) placement.Service {
return &placementService{
Storage: s,
placementServiceImpl: newPlacementServiceImpl(
opts,
s,

opts...,
),
}
}

type options struct {
placementAlgorithm placement.Algorithm
placementOpts placement.Options
}

// Option is an interface for PlacementService options.
type Option interface {
apply(*options)
}

// WithAlgorithm sets the algorithm implementation that will be used by PlacementService.
func WithAlgorithm(algo placement.Algorithm) Option {
return &algorithmOption{placementAlgorithm: algo}
}

type algorithmOption struct {
placementAlgorithm placement.Algorithm
}

func (a *algorithmOption) apply(opts *options) {
opts.placementAlgorithm = a.placementAlgorithm
}

type placementOptionsOption struct {
opts placement.Options
}

func (a *placementOptionsOption) apply(opts *options) {
opts.placementOpts = a.opts
}

// WithPlacementOptions sets the placement options for PlacementService.
func WithPlacementOptions(opts placement.Options) Option {
return &placementOptionsOption{opts: opts}
}

func newPlacementServiceImpl(
opts placement.Options,
storage minimalPlacementStorage,
opts ...Option,
) *placementServiceImpl {
if opts == nil {
opts = placement.NewOptions()
o := options{
placementOpts: placement.NewOptions(),
}

for _, opt := range opts {
opt.apply(&o)
}

if o.placementAlgorithm == nil {
o.placementAlgorithm = algo.NewAlgorithm(o.placementOpts)
}

instanceSelector := opts.InstanceSelector()
instanceSelector := o.placementOpts.InstanceSelector()
if instanceSelector == nil {
instanceSelector = selector.NewInstanceSelector(opts)
instanceSelector = selector.NewInstanceSelector(o.placementOpts)
}

return &placementServiceImpl{
store: storage,
opts: opts,
algo: algo.NewAlgorithm(opts),
opts: o.placementOpts,
algo: o.placementAlgorithm,
selector: instanceSelector,
logger: opts.InstrumentOptions().Logger(),
logger: o.placementOpts.InstrumentOptions().Logger(),
}
}

Expand Down
Loading

0 comments on commit 0210abc

Please sign in to comment.