Skip to content

Commit

Permalink
[coordinator] Configurable writes to leaving shards count towards con…
Browse files Browse the repository at this point in the history
…sistency, add read level unstrict all (#2687)
  • Loading branch information
robskillington authored Oct 2, 2020
1 parent a700d56 commit 81c3e19
Show file tree
Hide file tree
Showing 18 changed files with 355 additions and 68 deletions.
1 change: 1 addition & 0 deletions src/cmd/services/m3dbnode/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ func TestConfiguration(t *testing.T) {
fetchSeriesBlocksBatchConcurrency: null
fetchSeriesBlocksBatchSize: null
writeShardsInitializing: null
shardsLeavingCountTowardsConsistency: null
gcPercentage: 100
writeNewSeriesLimitPerSecond: 1048576
writeNewSeriesBackoffDuration: 2ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,71 @@ func TestAggregateResultsAccumulatorShardAvailabilityIsEnforced(t *testing.T) {
},
}.run()

// for consistency level unstrict all
testFetchStateWorkflow{
t: t,
topoMap: topoMap,
level: topology.ReadConsistencyLevelUnstrictAll,
steps: []testFetchStateWorklowStep{
testFetchStateWorklowStep{
hostname: "testhost1",
aggregateResult: &testAggregateSuccessResponse,
},
testFetchStateWorklowStep{
hostname: "testhost2",
aggregateResult: &testAggregateSuccessResponse,
},
testFetchStateWorklowStep{
hostname: "testhost0",
aggregateErr: errTestAggregate,
expectedDone: true,
expectedErr: false,
},
},
}.run()
testFetchStateWorkflow{
t: t,
topoMap: topoMap,
level: topology.ReadConsistencyLevelUnstrictAll,
steps: []testFetchStateWorklowStep{
testFetchStateWorklowStep{
hostname: "testhost1",
aggregateErr: errTestAggregate,
},
testFetchStateWorklowStep{
hostname: "testhost2",
aggregateResult: &testAggregateSuccessResponse,
},
testFetchStateWorklowStep{
hostname: "testhost0",
aggregateErr: errTestAggregate,
expectedDone: true,
expectedErr: false,
},
},
}.run()
testFetchStateWorkflow{
t: t,
topoMap: topoMap,
level: topology.ReadConsistencyLevelUnstrictAll,
steps: []testFetchStateWorklowStep{
testFetchStateWorklowStep{
hostname: "testhost1",
aggregateErr: errTestAggregate,
},
testFetchStateWorklowStep{
hostname: "testhost2",
aggregateErr: errTestAggregate,
},
testFetchStateWorklowStep{
hostname: "testhost0",
aggregateErr: errTestAggregate,
expectedDone: true,
expectedErr: true,
},
},
}.run()

// for consistency level all
testFetchStateWorkflow{
t: t,
Expand Down
56 changes: 56 additions & 0 deletions src/dbnode/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,13 @@ type Configuration struct {
// from the remote peer. Defaults to 4096.
FetchSeriesBlocksBatchSize *int `yaml:"fetchSeriesBlocksBatchSize"`

// WriteShardsInitializing sets whether or not to write to nodes that are initializing.
// WriteShardsInitializing sets whether or not writes to leaving shards
// count towards consistency, by default they do not.
WriteShardsInitializing *bool `yaml:"writeShardsInitializing"`

// ShardsLeavingCountTowardsConsistency sets whether or not writes to leaving shards
// count towards consistency, by default they do not.
ShardsLeavingCountTowardsConsistency *bool `yaml:"shardsLeavingCountTowardsConsistency"`
}

// ProtoConfiguration is the configuration for running with ProtoDataMode enabled.
Expand Down Expand Up @@ -425,6 +430,9 @@ func (c Configuration) NewAdminClient(
if c.WriteShardsInitializing != nil {
v = v.SetWriteShardsInitializing(*c.WriteShardsInitializing)
}
if c.ShardsLeavingCountTowardsConsistency != nil {
v = v.SetShardsLeavingCountTowardsConsistency(*c.ShardsLeavingCountTowardsConsistency)
}

// Cast to admin options to apply admin config options.
opts := v.(AdminOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func TestFetchTaggedResultsAccumulatorGenerativeConsistencyCheck(t *testing.T) {
topology.ReadConsistencyLevelMajority,
topology.ReadConsistencyLevelOne,
topology.ReadConsistencyLevelUnstrictMajority,
topology.ReadConsistencyLevelUnstrictAll,
),
))

Expand Down Expand Up @@ -189,6 +190,7 @@ func TestAggregateResultsAccumulatorGenerativeConsistencyCheck(t *testing.T) {
topology.ReadConsistencyLevelMajority,
topology.ReadConsistencyLevelOne,
topology.ReadConsistencyLevelUnstrictMajority,
topology.ReadConsistencyLevelUnstrictAll,
),
))

Expand Down Expand Up @@ -219,6 +221,8 @@ func (res topoAndHostResponses) meetsConsistencyCheckForLevel(
requiredNumSuccessResponsePerShard = majority
case topology.ReadConsistencyLevelUnstrictMajority:
fallthrough
case topology.ReadConsistencyLevelUnstrictAll:
fallthrough
case topology.ReadConsistencyLevelOne:
requiredNumSuccessResponsePerShard = 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,71 @@ func TestFetchTaggedResultsAccumulatorShardAvailabilityIsEnforced(t *testing.T)
},
}.run()

// for consistency level unstrict all
testFetchStateWorkflow{
t: t,
topoMap: topoMap,
level: topology.ReadConsistencyLevelUnstrictAll,
steps: []testFetchStateWorklowStep{
testFetchStateWorklowStep{
hostname: "testhost1",
fetchTaggedResult: &testFetchTaggedSuccessResponse,
},
testFetchStateWorklowStep{
hostname: "testhost2",
fetchTaggedResult: &testFetchTaggedSuccessResponse,
},
testFetchStateWorklowStep{
hostname: "testhost0",
fetchTaggedErr: errTestFetchTagged,
expectedDone: true,
expectedErr: false,
},
},
}.run()
testFetchStateWorkflow{
t: t,
topoMap: topoMap,
level: topology.ReadConsistencyLevelUnstrictAll,
steps: []testFetchStateWorklowStep{
testFetchStateWorklowStep{
hostname: "testhost1",
fetchTaggedErr: errTestFetchTagged,
},
testFetchStateWorklowStep{
hostname: "testhost2",
fetchTaggedResult: &testFetchTaggedSuccessResponse,
},
testFetchStateWorklowStep{
hostname: "testhost0",
fetchTaggedErr: errTestFetchTagged,
expectedDone: true,
expectedErr: false,
},
},
}.run()
testFetchStateWorkflow{
t: t,
topoMap: topoMap,
level: topology.ReadConsistencyLevelUnstrictAll,
steps: []testFetchStateWorklowStep{
testFetchStateWorklowStep{
hostname: "testhost1",
fetchTaggedErr: errTestFetchTagged,
},
testFetchStateWorklowStep{
hostname: "testhost2",
fetchTaggedErr: errTestFetchTagged,
},
testFetchStateWorklowStep{
hostname: "testhost0",
fetchTaggedErr: errTestFetchTagged,
expectedDone: true,
expectedErr: true,
},
},
}.run()

// for consistency level all
testFetchStateWorkflow{
t: t,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (tm testFetchStateWorkflow) run() fetchTaggedResultAccumulator {
accum = newFetchTaggedResultAccumulator()
accum.Clear()
accum.Reset(tm.startTime, tm.endTime, tm.topoMap, majority, tm.level)
for _, s := range tm.steps {
for i, s := range tm.steps {
var (
done bool
err error
Expand All @@ -341,8 +341,8 @@ func (tm testFetchStateWorkflow) run() fetchTaggedResultAccumulator {
default:
assert.FailNow(tm.t, "unexpected workflow step", fmt.Sprintf("%+v", s))
}
assert.Equal(tm.t, s.expectedDone, done, fmt.Sprintf("%+v", s))
assert.Equal(tm.t, s.expectedErr, err != nil, fmt.Sprintf("%+v", s))
assert.Equal(tm.t, s.expectedDone, done, fmt.Sprintf("i=%d, step=%+v", i, s))
assert.Equal(tm.t, s.expectedErr, err != nil, fmt.Sprintf("i=%d, step=%+v, err=%v", i, s, err))
}
return accum
}
Expand Down
15 changes: 15 additions & 0 deletions src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
// defaultWriteShardsInitializing is the default write to shards intializing value
defaultWriteShardsInitializing = true

// defaultShardsLeavingCountTowardsConsistency is the default shards leaving count towards consistency
defaultShardsLeavingCountTowardsConsistency = false

// defaultIdentifierPoolSize is the default identifier pool size
defaultIdentifierPoolSize = 8192

Expand Down Expand Up @@ -253,6 +256,7 @@ type options struct {
fetchRetrier xretry.Retrier
streamBlocksRetrier xretry.Retrier
writeShardsInitializing bool
shardsLeavingCountTowardsConsistency bool
newConnectionFn NewConnectionFn
readerIteratorAllocate encoding.ReaderIteratorAllocate
writeOperationPoolSize int
Expand Down Expand Up @@ -370,6 +374,7 @@ func newOptions() *options {
writeRetrier: defaultWriteRetrier,
fetchRetrier: defaultFetchRetrier,
writeShardsInitializing: defaultWriteShardsInitializing,
shardsLeavingCountTowardsConsistency: defaultShardsLeavingCountTowardsConsistency,
tagEncoderPoolSize: defaultTagEncoderPoolSize,
tagEncoderOpts: serialize.NewTagEncoderOptions(),
tagDecoderPoolSize: defaultTagDecoderPoolSize,
Expand Down Expand Up @@ -719,6 +724,16 @@ func (o *options) WriteShardsInitializing() bool {
return o.writeShardsInitializing
}

func (o *options) SetShardsLeavingCountTowardsConsistency(value bool) Options {
opts := *o
opts.shardsLeavingCountTowardsConsistency = value
return &opts
}

func (o *options) ShardsLeavingCountTowardsConsistency() bool {
return o.shardsLeavingCountTowardsConsistency
}

func (o *options) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options {
opts := *o
opts.tagEncoderOpts = value
Expand Down
Loading

0 comments on commit 81c3e19

Please sign in to comment.