diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index 1b0624b661..9248a878f4 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -411,6 +411,7 @@ func TestConfiguration(t *testing.T) { fetchSeriesBlocksBatchConcurrency: null fetchSeriesBlocksBatchSize: null writeShardsInitializing: null + shardsLeavingCountTowardsConsistency: null gcPercentage: 100 writeNewSeriesLimitPerSecond: 1048576 writeNewSeriesBackoffDuration: 2ms diff --git a/src/dbnode/client/aggregate_results_accumulator_consistency_test.go b/src/dbnode/client/aggregate_results_accumulator_consistency_test.go index e3f2b3d9d0..71a79ccf05 100644 --- a/src/dbnode/client/aggregate_results_accumulator_consistency_test.go +++ b/src/dbnode/client/aggregate_results_accumulator_consistency_test.go @@ -154,6 +154,71 @@ func TestAggregateResultsAccumulatorShardAvailabilityIsEnforced(t *testing.T) { }, }.run() + // for consistency level unstrict all + testFetchStateWorkflow{ + t: t, + topoMap: topoMap, + level: topology.ReadConsistencyLevelUnstrictAll, + steps: []testFetchStateWorklowStep{ + testFetchStateWorklowStep{ + hostname: "testhost1", + aggregateResult: &testAggregateSuccessResponse, + }, + testFetchStateWorklowStep{ + hostname: "testhost2", + aggregateResult: &testAggregateSuccessResponse, + }, + testFetchStateWorklowStep{ + hostname: "testhost0", + aggregateErr: errTestAggregate, + expectedDone: true, + expectedErr: false, + }, + }, + }.run() + testFetchStateWorkflow{ + t: t, + topoMap: topoMap, + level: topology.ReadConsistencyLevelUnstrictAll, + steps: []testFetchStateWorklowStep{ + testFetchStateWorklowStep{ + hostname: "testhost1", + aggregateErr: errTestAggregate, + }, + testFetchStateWorklowStep{ + hostname: "testhost2", + aggregateResult: &testAggregateSuccessResponse, + }, + testFetchStateWorklowStep{ + hostname: "testhost0", + aggregateErr: errTestAggregate, + expectedDone: true, + expectedErr: false, + }, + }, + }.run() + testFetchStateWorkflow{ + t: t, + topoMap: topoMap, + level: topology.ReadConsistencyLevelUnstrictAll, + steps: []testFetchStateWorklowStep{ + testFetchStateWorklowStep{ + hostname: "testhost1", + aggregateErr: errTestAggregate, + }, + testFetchStateWorklowStep{ + hostname: "testhost2", + aggregateErr: errTestAggregate, + }, + testFetchStateWorklowStep{ + hostname: "testhost0", + aggregateErr: errTestAggregate, + expectedDone: true, + expectedErr: true, + }, + }, + }.run() + // for consistency level all testFetchStateWorkflow{ t: t, diff --git a/src/dbnode/client/client_mock.go b/src/dbnode/client/client_mock.go index 0eddc159c9..541df13d09 100644 --- a/src/dbnode/client/client_mock.go +++ b/src/dbnode/client/client_mock.go @@ -1804,6 +1804,34 @@ func (mr *MockOptionsMockRecorder) WriteShardsInitializing() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteShardsInitializing", reflect.TypeOf((*MockOptions)(nil).WriteShardsInitializing)) } +// SetShardsLeavingCountTowardsConsistency mocks base method +func (m *MockOptions) SetShardsLeavingCountTowardsConsistency(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetShardsLeavingCountTowardsConsistency", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetShardsLeavingCountTowardsConsistency indicates an expected call of SetShardsLeavingCountTowardsConsistency +func (mr *MockOptionsMockRecorder) SetShardsLeavingCountTowardsConsistency(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetShardsLeavingCountTowardsConsistency", reflect.TypeOf((*MockOptions)(nil).SetShardsLeavingCountTowardsConsistency), value) +} + +// ShardsLeavingCountTowardsConsistency mocks base method +func (m *MockOptions) ShardsLeavingCountTowardsConsistency() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ShardsLeavingCountTowardsConsistency") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ShardsLeavingCountTowardsConsistency indicates an expected call of ShardsLeavingCountTowardsConsistency +func (mr *MockOptionsMockRecorder) ShardsLeavingCountTowardsConsistency() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShardsLeavingCountTowardsConsistency", reflect.TypeOf((*MockOptions)(nil).ShardsLeavingCountTowardsConsistency)) +} + // SetTagEncoderOptions mocks base method func (m *MockOptions) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options { m.ctrl.T.Helper() @@ -3339,6 +3367,34 @@ func (mr *MockAdminOptionsMockRecorder) WriteShardsInitializing() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteShardsInitializing", reflect.TypeOf((*MockAdminOptions)(nil).WriteShardsInitializing)) } +// SetShardsLeavingCountTowardsConsistency mocks base method +func (m *MockAdminOptions) SetShardsLeavingCountTowardsConsistency(value bool) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetShardsLeavingCountTowardsConsistency", value) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetShardsLeavingCountTowardsConsistency indicates an expected call of SetShardsLeavingCountTowardsConsistency +func (mr *MockAdminOptionsMockRecorder) SetShardsLeavingCountTowardsConsistency(value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetShardsLeavingCountTowardsConsistency", reflect.TypeOf((*MockAdminOptions)(nil).SetShardsLeavingCountTowardsConsistency), value) +} + +// ShardsLeavingCountTowardsConsistency mocks base method +func (m *MockAdminOptions) ShardsLeavingCountTowardsConsistency() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ShardsLeavingCountTowardsConsistency") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ShardsLeavingCountTowardsConsistency indicates an expected call of ShardsLeavingCountTowardsConsistency +func (mr *MockAdminOptionsMockRecorder) ShardsLeavingCountTowardsConsistency() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShardsLeavingCountTowardsConsistency", reflect.TypeOf((*MockAdminOptions)(nil).ShardsLeavingCountTowardsConsistency)) +} + // SetTagEncoderOptions mocks base method func (m *MockAdminOptions) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/client/config.go b/src/dbnode/client/config.go index 7108970d44..c403ca211b 100644 --- a/src/dbnode/client/config.go +++ b/src/dbnode/client/config.go @@ -115,8 +115,13 @@ type Configuration struct { // from the remote peer. Defaults to 4096. FetchSeriesBlocksBatchSize *int `yaml:"fetchSeriesBlocksBatchSize"` - // WriteShardsInitializing sets whether or not to write to nodes that are initializing. + // WriteShardsInitializing sets whether or not writes to leaving shards + // count towards consistency, by default they do not. WriteShardsInitializing *bool `yaml:"writeShardsInitializing"` + + // ShardsLeavingCountTowardsConsistency sets whether or not writes to leaving shards + // count towards consistency, by default they do not. + ShardsLeavingCountTowardsConsistency *bool `yaml:"shardsLeavingCountTowardsConsistency"` } // ProtoConfiguration is the configuration for running with ProtoDataMode enabled. @@ -425,6 +430,9 @@ func (c Configuration) NewAdminClient( if c.WriteShardsInitializing != nil { v = v.SetWriteShardsInitializing(*c.WriteShardsInitializing) } + if c.ShardsLeavingCountTowardsConsistency != nil { + v = v.SetShardsLeavingCountTowardsConsistency(*c.ShardsLeavingCountTowardsConsistency) + } // Cast to admin options to apply admin config options. opts := v.(AdminOptions) diff --git a/src/dbnode/client/fetch_tagged_results_accumulator_consistency_prop_test.go b/src/dbnode/client/fetch_tagged_results_accumulator_consistency_prop_test.go index d01b9c543e..64155d0f65 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator_consistency_prop_test.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator_consistency_prop_test.go @@ -112,6 +112,7 @@ func TestFetchTaggedResultsAccumulatorGenerativeConsistencyCheck(t *testing.T) { topology.ReadConsistencyLevelMajority, topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority, + topology.ReadConsistencyLevelUnstrictAll, ), )) @@ -189,6 +190,7 @@ func TestAggregateResultsAccumulatorGenerativeConsistencyCheck(t *testing.T) { topology.ReadConsistencyLevelMajority, topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority, + topology.ReadConsistencyLevelUnstrictAll, ), )) @@ -219,6 +221,8 @@ func (res topoAndHostResponses) meetsConsistencyCheckForLevel( requiredNumSuccessResponsePerShard = majority case topology.ReadConsistencyLevelUnstrictMajority: fallthrough + case topology.ReadConsistencyLevelUnstrictAll: + fallthrough case topology.ReadConsistencyLevelOne: requiredNumSuccessResponsePerShard = 1 } diff --git a/src/dbnode/client/fetch_tagged_results_accumulator_consistency_test.go b/src/dbnode/client/fetch_tagged_results_accumulator_consistency_test.go index d50426575f..e853db94a3 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator_consistency_test.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator_consistency_test.go @@ -152,6 +152,71 @@ func TestFetchTaggedResultsAccumulatorShardAvailabilityIsEnforced(t *testing.T) }, }.run() + // for consistency level unstrict all + testFetchStateWorkflow{ + t: t, + topoMap: topoMap, + level: topology.ReadConsistencyLevelUnstrictAll, + steps: []testFetchStateWorklowStep{ + testFetchStateWorklowStep{ + hostname: "testhost1", + fetchTaggedResult: &testFetchTaggedSuccessResponse, + }, + testFetchStateWorklowStep{ + hostname: "testhost2", + fetchTaggedResult: &testFetchTaggedSuccessResponse, + }, + testFetchStateWorklowStep{ + hostname: "testhost0", + fetchTaggedErr: errTestFetchTagged, + expectedDone: true, + expectedErr: false, + }, + }, + }.run() + testFetchStateWorkflow{ + t: t, + topoMap: topoMap, + level: topology.ReadConsistencyLevelUnstrictAll, + steps: []testFetchStateWorklowStep{ + testFetchStateWorklowStep{ + hostname: "testhost1", + fetchTaggedErr: errTestFetchTagged, + }, + testFetchStateWorklowStep{ + hostname: "testhost2", + fetchTaggedResult: &testFetchTaggedSuccessResponse, + }, + testFetchStateWorklowStep{ + hostname: "testhost0", + fetchTaggedErr: errTestFetchTagged, + expectedDone: true, + expectedErr: false, + }, + }, + }.run() + testFetchStateWorkflow{ + t: t, + topoMap: topoMap, + level: topology.ReadConsistencyLevelUnstrictAll, + steps: []testFetchStateWorklowStep{ + testFetchStateWorklowStep{ + hostname: "testhost1", + fetchTaggedErr: errTestFetchTagged, + }, + testFetchStateWorklowStep{ + hostname: "testhost2", + fetchTaggedErr: errTestFetchTagged, + }, + testFetchStateWorklowStep{ + hostname: "testhost0", + fetchTaggedErr: errTestFetchTagged, + expectedDone: true, + expectedErr: true, + }, + }, + }.run() + // for consistency level all testFetchStateWorkflow{ t: t, diff --git a/src/dbnode/client/fetch_tagged_results_accumulator_merge_test.go b/src/dbnode/client/fetch_tagged_results_accumulator_merge_test.go index 9614dc9f56..0981bf9685 100644 --- a/src/dbnode/client/fetch_tagged_results_accumulator_merge_test.go +++ b/src/dbnode/client/fetch_tagged_results_accumulator_merge_test.go @@ -320,7 +320,7 @@ func (tm testFetchStateWorkflow) run() fetchTaggedResultAccumulator { accum = newFetchTaggedResultAccumulator() accum.Clear() accum.Reset(tm.startTime, tm.endTime, tm.topoMap, majority, tm.level) - for _, s := range tm.steps { + for i, s := range tm.steps { var ( done bool err error @@ -341,8 +341,8 @@ func (tm testFetchStateWorkflow) run() fetchTaggedResultAccumulator { default: assert.FailNow(tm.t, "unexpected workflow step", fmt.Sprintf("%+v", s)) } - assert.Equal(tm.t, s.expectedDone, done, fmt.Sprintf("%+v", s)) - assert.Equal(tm.t, s.expectedErr, err != nil, fmt.Sprintf("%+v", s)) + assert.Equal(tm.t, s.expectedDone, done, fmt.Sprintf("i=%d, step=%+v", i, s)) + assert.Equal(tm.t, s.expectedErr, err != nil, fmt.Sprintf("i=%d, step=%+v, err=%v", i, s, err)) } return accum } diff --git a/src/dbnode/client/options.go b/src/dbnode/client/options.go index 6b06004048..50c315b085 100644 --- a/src/dbnode/client/options.go +++ b/src/dbnode/client/options.go @@ -92,6 +92,9 @@ const ( // defaultWriteShardsInitializing is the default write to shards intializing value defaultWriteShardsInitializing = true + // defaultShardsLeavingCountTowardsConsistency is the default shards leaving count towards consistency + defaultShardsLeavingCountTowardsConsistency = false + // defaultIdentifierPoolSize is the default identifier pool size defaultIdentifierPoolSize = 8192 @@ -253,6 +256,7 @@ type options struct { fetchRetrier xretry.Retrier streamBlocksRetrier xretry.Retrier writeShardsInitializing bool + shardsLeavingCountTowardsConsistency bool newConnectionFn NewConnectionFn readerIteratorAllocate encoding.ReaderIteratorAllocate writeOperationPoolSize int @@ -370,6 +374,7 @@ func newOptions() *options { writeRetrier: defaultWriteRetrier, fetchRetrier: defaultFetchRetrier, writeShardsInitializing: defaultWriteShardsInitializing, + shardsLeavingCountTowardsConsistency: defaultShardsLeavingCountTowardsConsistency, tagEncoderPoolSize: defaultTagEncoderPoolSize, tagEncoderOpts: serialize.NewTagEncoderOptions(), tagDecoderPoolSize: defaultTagDecoderPoolSize, @@ -719,6 +724,16 @@ func (o *options) WriteShardsInitializing() bool { return o.writeShardsInitializing } +func (o *options) SetShardsLeavingCountTowardsConsistency(value bool) Options { + opts := *o + opts.shardsLeavingCountTowardsConsistency = value + return &opts +} + +func (o *options) ShardsLeavingCountTowardsConsistency() bool { + return o.shardsLeavingCountTowardsConsistency +} + func (o *options) SetTagEncoderOptions(value serialize.TagEncoderOptions) Options { opts := *o opts.tagEncoderOpts = value diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index c53661c728..091dab455d 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -136,31 +136,32 @@ type sessionState struct { } type session struct { - state sessionState - opts Options - runtimeOptsListenerCloser xclose.Closer - scope tally.Scope - nowFn clock.NowFn - log *zap.Logger - logWriteErrorSampler *sampler.Sampler - logFetchErrorSampler *sampler.Sampler - newHostQueueFn newHostQueueFn - writeRetrier xretry.Retrier - fetchRetrier xretry.Retrier - streamBlocksRetrier xretry.Retrier - pools sessionPools - fetchBatchSize int - newPeerBlocksQueueFn newPeerBlocksQueueFn - reattemptStreamBlocksFromPeersFn reattemptStreamBlocksFromPeersFn - pickBestPeerFn pickBestPeerFn - origin topology.Host - streamBlocksMaxBlockRetries int - streamBlocksWorkers xsync.WorkerPool - streamBlocksBatchSize int - streamBlocksMetadataBatchTimeout time.Duration - streamBlocksBatchTimeout time.Duration - writeShardsInitializing bool - metrics sessionMetrics + state sessionState + opts Options + runtimeOptsListenerCloser xclose.Closer + scope tally.Scope + nowFn clock.NowFn + log *zap.Logger + logWriteErrorSampler *sampler.Sampler + logFetchErrorSampler *sampler.Sampler + newHostQueueFn newHostQueueFn + writeRetrier xretry.Retrier + fetchRetrier xretry.Retrier + streamBlocksRetrier xretry.Retrier + pools sessionPools + fetchBatchSize int + newPeerBlocksQueueFn newPeerBlocksQueueFn + reattemptStreamBlocksFromPeersFn reattemptStreamBlocksFromPeersFn + pickBestPeerFn pickBestPeerFn + origin topology.Host + streamBlocksMaxBlockRetries int + streamBlocksWorkers xsync.WorkerPool + streamBlocksBatchSize int + streamBlocksMetadataBatchTimeout time.Duration + streamBlocksBatchTimeout time.Duration + writeShardsInitializing bool + shardsLeavingCountTowardsConsistency bool + metrics sessionMetrics } type shardMetricsKey struct { @@ -289,8 +290,9 @@ func newSession(opts Options) (clientSession, error) { context: opts.ContextPool(), id: opts.IdentifierPool(), }, - writeShardsInitializing: opts.WriteShardsInitializing(), - metrics: newSessionMetrics(scope), + writeShardsInitializing: opts.WriteShardsInitializing(), + shardsLeavingCountTowardsConsistency: opts.ShardsLeavingCountTowardsConsistency(), + metrics: newSessionMetrics(scope), } s.reattemptStreamBlocksFromPeersFn = s.streamBlocksReattemptFromPeers s.pickBestPeerFn = s.streamBlocksPickBestPeer @@ -1136,6 +1138,7 @@ func (s *session) writeAttemptWithRLock( state := s.pools.writeState.Get() state.consistencyLevel = s.state.writeLevel + state.shardsLeavingCountTowardsConsistency = s.shardsLeavingCountTowardsConsistency state.topoMap = s.state.topoMap state.incRef() diff --git a/src/dbnode/client/session_fetch_test.go b/src/dbnode/client/session_fetch_test.go index f1b81dbc27..c0c8afa717 100644 --- a/src/dbnode/client/session_fetch_test.go +++ b/src/dbnode/client/session_fetch_test.go @@ -316,6 +316,16 @@ func TestSessionFetchReadConsistencyLevelAll(t *testing.T) { } } +func TestSessionFetchReadConsistencyLevelUnstrictAll(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + for i := 0; i <= 2; i++ { + testFetchConsistencyLevel(t, ctrl, topology.ReadConsistencyLevelUnstrictAll, i, outcomeSuccess) + } + testFetchConsistencyLevel(t, ctrl, topology.ReadConsistencyLevelUnstrictAll, 3, outcomeFail) +} + func TestSessionFetchReadConsistencyLevelMajority(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/client/types.go b/src/dbnode/client/types.go index ccbd574f94..3d4040bd7a 100644 --- a/src/dbnode/client/types.go +++ b/src/dbnode/client/types.go @@ -419,6 +419,14 @@ type Options interface { // initializing or not. WriteShardsInitializing() bool + // SetShardsLeavingCountTowardsConsistency sets whether to count shards + // that are leaving or not towards consistency level calculations. + SetShardsLeavingCountTowardsConsistency(value bool) Options + + // ShardsLeavingCountTowardsConsistency returns whether to count shards + // that are leaving or not towards consistency level calculations. + ShardsLeavingCountTowardsConsistency() bool + // SetTagEncoderOptions sets the TagEncoderOptions. SetTagEncoderOptions(value serialize.TagEncoderOptions) Options diff --git a/src/dbnode/client/write_state.go b/src/dbnode/client/write_state.go index 3af5d518f6..61a39d7b61 100644 --- a/src/dbnode/client/write_state.go +++ b/src/dbnode/client/write_state.go @@ -48,15 +48,16 @@ type writeState struct { sync.Mutex refCounter - consistencyLevel topology.ConsistencyLevel - topoMap topology.Map - op writeOp - nsID ident.ID - tsID ident.ID - tagEncoder serialize.TagEncoder - majority, pending int32 - success int32 - errors []error + consistencyLevel topology.ConsistencyLevel + shardsLeavingCountTowardsConsistency bool + topoMap topology.Map + op writeOp + nsID ident.ID + tsID ident.ID + tagEncoder serialize.TagEncoder + majority, pending int32 + success int32 + errors []error queues []hostQueue tagEncoderPool serialize.TagEncoderPool @@ -128,20 +129,29 @@ func (w *writeState) completionFn(result interface{}, err error) { } else if shardState, err := hostShardSet.ShardSet().LookupStateByID(w.op.ShardID()); err != nil { errStr := "missing shard %d in host %s" wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) - } else if shardState != shard.Available { - // NB(bl): only count writes to available shards towards success - var errStr string - switch shardState { - case shard.Initializing: - errStr = "shard %d in host %s is not available (initializing)" - case shard.Leaving: - errStr = "shard %d in host %s not available (leaving)" - default: - errStr = "shard %d in host %s not available (unknown state)" - } - wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) } else { - w.success++ + available := shardState == shard.Available + leaving := shardState == shard.Leaving + leavingAndShardsLeavingCountTowardsConsistency := leaving && + w.shardsLeavingCountTowardsConsistency + // NB(bl): Only count writes to available shards towards success. + // NB(r): If shard is leaving and configured to allow writes to leaving + // shards to count towards consistency then allow that to count + // to success. + if !available && !leavingAndShardsLeavingCountTowardsConsistency { + var errStr string + switch shardState { + case shard.Initializing: + errStr = "shard %d in host %s is not available (initializing)" + case shard.Leaving: + errStr = "shard %d in host %s not available (leaving)" + default: + errStr = "shard %d in host %s not available (unknown state)" + } + wErr = xerrors.NewRetryableError(fmt.Errorf(errStr, w.op.ShardID(), hostID)) + } else { + w.success++ + } } if wErr != nil { diff --git a/src/dbnode/client/write_test.go b/src/dbnode/client/write_test.go index f2f7c7e5df..8b12d03e9a 100644 --- a/src/dbnode/client/write_test.go +++ b/src/dbnode/client/write_test.go @@ -123,6 +123,17 @@ func TestShardNotAvailable(t *testing.T) { writeTestTeardown(wState, &writeWg) } +func TestShardLeavingWithShardsLeavingCountTowardsConsistency(t *testing.T) { + var writeWg sync.WaitGroup + + wState, s, host := writeTestSetup(t, &writeWg) + wState.shardsLeavingCountTowardsConsistency = true + setShardStates(t, s, host, shard.Leaving) + wState.completionFn(host, nil) + assert.Equal(t, int32(1), wState.success) + writeTestTeardown(wState, &writeWg) +} + // utils func getWriteState(s *session, w writeStub) *writeState { diff --git a/src/dbnode/integration/fetch_tagged_quorum_test.go b/src/dbnode/integration/fetch_tagged_quorum_test.go index eeb07baabd..4bb04857ef 100644 --- a/src/dbnode/integration/fetch_tagged_quorum_test.go +++ b/src/dbnode/integration/fetch_tagged_quorum_test.go @@ -64,7 +64,7 @@ func TestFetchTaggedQuorumNormalOnlyOneUp(t *testing.T) { writeTagged(t, nodes[0]) testFetch.assertContainsTaggedResult(t, - topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority) + topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelUnstrictAll) testFetch.assertFailsTaggedResult(t, topology.ReadConsistencyLevelAll, topology.ReadConsistencyLevelMajority) } @@ -92,7 +92,7 @@ func TestFetchTaggedQuorumNormalOnlyTwoUp(t *testing.T) { // succeed to two nodes testFetch.assertContainsTaggedResult(t, topology.ReadConsistencyLevelOne, - topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelMajority) + topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelMajority, topology.ReadConsistencyLevelUnstrictAll) testFetch.assertFailsTaggedResult(t, topology.ReadConsistencyLevelAll) } @@ -120,8 +120,9 @@ func TestFetchTaggedQuorumNormalAllUp(t *testing.T) { // succeed to all nodes testFetch.assertContainsTaggedResult(t, - topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority, - topology.ReadConsistencyLevelMajority, topology.ReadConsistencyLevelAll) + topology.ReadConsistencyLevelOne, + topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelMajority, + topology.ReadConsistencyLevelUnstrictAll, topology.ReadConsistencyLevelAll) } func TestFetchTaggedQuorumAddNodeOnlyLeavingInitializingUp(t *testing.T) { @@ -148,8 +149,9 @@ func TestFetchTaggedQuorumAddNodeOnlyLeavingInitializingUp(t *testing.T) { // No fetches succeed to available nodes testFetch.assertFailsTaggedResult(t, - topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority, - topology.ReadConsistencyLevelMajority, topology.ReadConsistencyLevelAll) + topology.ReadConsistencyLevelOne, + topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelMajority, + topology.ReadConsistencyLevelUnstrictAll, topology.ReadConsistencyLevelAll) } func TestFetchTaggedQuorumAddNodeOnlyOneNormalAndLeavingInitializingUp(t *testing.T) { @@ -177,7 +179,7 @@ func TestFetchTaggedQuorumAddNodeOnlyOneNormalAndLeavingInitializingUp(t *testin // fetches succeed to one available node testFetch.assertContainsTaggedResult(t, - topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelOne) + topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelUnstrictAll) testFetch.assertFailsTaggedResult(t, topology.ReadConsistencyLevelMajority, topology.ReadConsistencyLevelAll) @@ -209,7 +211,8 @@ func TestFetchTaggedQuorumAddNodeAllUp(t *testing.T) { writeTagged(t, nodes...) testFetch.assertContainsTaggedResult(t, topology.ReadConsistencyLevelOne, - topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelMajority) + topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelMajority, + topology.ReadConsistencyLevelUnstrictAll) testFetch.assertFailsTaggedResult(t, topology.ReadConsistencyLevelAll) } diff --git a/src/dbnode/integration/index_multiple_node_high_concurrency_test.go b/src/dbnode/integration/index_multiple_node_high_concurrency_test.go index 876c03693f..79c5cc328e 100644 --- a/src/dbnode/integration/index_multiple_node_high_concurrency_test.go +++ b/src/dbnode/integration/index_multiple_node_high_concurrency_test.go @@ -55,6 +55,7 @@ func TestIndexMultipleNodeHighConcurrency(t *testing.T) { topology.ReadConsistencyLevelOne, topology.ReadConsistencyLevelUnstrictMajority, topology.ReadConsistencyLevelMajority, + topology.ReadConsistencyLevelUnstrictAll, topology.ReadConsistencyLevelAll, } for _, lvl := range levels { diff --git a/src/dbnode/topology/consistency_level.go b/src/dbnode/topology/consistency_level.go index 0629566f95..64dd50b367 100644 --- a/src/dbnode/topology/consistency_level.go +++ b/src/dbnode/topology/consistency_level.go @@ -222,6 +222,11 @@ const ( // ReadConsistencyLevelMajority corresponds to reading from the majority of nodes ReadConsistencyLevelMajority + // ReadConsistencyLevelUnstrictAll corresponds to reading from all nodes + // but relaxing the constraint when it cannot be met, falling back to returning success when + // reading from at least a single node after attempting reading from all of nodes + ReadConsistencyLevelUnstrictAll + // ReadConsistencyLevelAll corresponds to reading from all of the nodes ReadConsistencyLevelAll ) @@ -237,6 +242,8 @@ func (l ReadConsistencyLevel) String() string { return unstrictMajority case ReadConsistencyLevelMajority: return majority + case ReadConsistencyLevelUnstrictAll: + return unstrictAll case ReadConsistencyLevelAll: return all } @@ -248,6 +255,7 @@ var validReadConsistencyLevels = []ReadConsistencyLevel{ ReadConsistencyLevelOne, ReadConsistencyLevelUnstrictMajority, ReadConsistencyLevelMajority, + ReadConsistencyLevelUnstrictAll, ReadConsistencyLevelAll, } @@ -302,6 +310,7 @@ const ( unknown = "unknown" any = "any" all = "all" + unstrictAll = "unstrict_all" one = "one" none = "none" majority = "majority" @@ -348,7 +357,7 @@ func ReadConsistencyTermination( return success > 0 || doneAll case ReadConsistencyLevelMajority, ReadConsistencyLevelUnstrictMajority: return success >= majority || doneAll - case ReadConsistencyLevelAll: + case ReadConsistencyLevelAll, ReadConsistencyLevelUnstrictAll: return doneAll } panic(fmt.Errorf("unrecognized consistency level: %s", level.String())) @@ -366,7 +375,7 @@ func ReadConsistencyAchieved( return numSuccess == numPeers // Meets all case ReadConsistencyLevelMajority: return numSuccess >= majority // Meets majority - case ReadConsistencyLevelOne, ReadConsistencyLevelUnstrictMajority: + case ReadConsistencyLevelOne, ReadConsistencyLevelUnstrictMajority, ReadConsistencyLevelUnstrictAll: return numSuccess > 0 // Meets one case ReadConsistencyLevelNone: return true // Always meets none @@ -378,7 +387,7 @@ func ReadConsistencyAchieved( // satisfy the read consistency. func NumDesiredForReadConsistency(level ReadConsistencyLevel, numReplicas, majority int) int { switch level { - case ReadConsistencyLevelAll: + case ReadConsistencyLevelAll, ReadConsistencyLevelUnstrictAll: return numReplicas case ReadConsistencyLevelMajority, ReadConsistencyLevelUnstrictMajority: return majority diff --git a/src/x/sync/pooled_worker_pool.go b/src/x/sync/pooled_worker_pool.go index e1eb1220e3..93af3f8c6d 100644 --- a/src/x/sync/pooled_worker_pool.go +++ b/src/x/sync/pooled_worker_pool.go @@ -35,6 +35,10 @@ const ( numGoroutinesGaugeSampleRate = 1000 ) +var ( + pooledWorkerPoolGoroutinesCapacity *int +) + type pooledWorkerPool struct { sync.Mutex numRoutinesAtomic int64 @@ -61,7 +65,15 @@ func NewPooledWorkerPool(size int, opts PooledWorkerPoolOptions) (PooledWorkerPo workChs := make([]chan Work, numShards) for i := range workChs { - workChs[i] = make(chan Work, int64(size)/numShards) + if c := pooledWorkerPoolGoroutinesCapacity; c != nil { + if *c == 0 { + workChs[i] = make(chan Work) + } else { + workChs[i] = make(chan Work, *c) + } + } else { + workChs[i] = make(chan Work, int64(size)/numShards) + } } return &pooledWorkerPool{ diff --git a/src/x/sync/pooled_worker_pool_test.go b/src/x/sync/pooled_worker_pool_test.go index 72296fc585..b6a39af31c 100644 --- a/src/x/sync/pooled_worker_pool_test.go +++ b/src/x/sync/pooled_worker_pool_test.go @@ -51,8 +51,15 @@ func TestPooledWorkerPoolGo(t *testing.T) { func TestPooledWorkerPoolGoWithTimeout(t *testing.T) { var ( - workers = 2 + workers = 2 + channelCapacity = 1 ) + // So we can control how empty the worker pool chanel is we + // set capacity to be same as num workers. + pooledWorkerPoolGoroutinesCapacity = &channelCapacity + defer func() { + pooledWorkerPoolGoroutinesCapacity = nil + }() p, err := NewPooledWorkerPool(workers, NewPooledWorkerPoolOptions()) require.NoError(t, err) @@ -77,8 +84,7 @@ func TestPooledWorkerPoolGoWithTimeout(t *testing.T) { wg.Done() - require.Equal(t, workers, resultsTrue) - require.Equal(t, workers, resultsFalse) + require.True(t, resultsFalse > 0) } func TestPooledWorkerPoolGrowOnDemand(t *testing.T) {