diff --git a/ring/replication_strategy.go b/ring/replication_strategy.go index 4ff6f86ae..44e05a538 100644 --- a/ring/replication_strategy.go +++ b/ring/replication_strategy.go @@ -2,11 +2,8 @@ package ring import ( "fmt" + "strings" "time" - - "github.com/go-kit/log" - "github.com/go-kit/log/level" - "github.com/pkg/errors" ) type ReplicationStrategy interface { @@ -16,12 +13,10 @@ type ReplicationStrategy interface { Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error) } -type defaultReplicationStrategy struct { - logger log.Logger -} +type defaultReplicationStrategy struct{} -func NewDefaultReplicationStrategy(logger log.Logger) ReplicationStrategy { - return &defaultReplicationStrategy{logger: logger} +func NewDefaultReplicationStrategy() ReplicationStrategy { + return &defaultReplicationStrategy{} } // Filter decides, given the set of instances eligible for a key, @@ -44,12 +39,12 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed instances // will cause the whole write to fail. - var skipped []string + var unhealthy []string for i := 0; i < len(instances); { if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { - skipped = append(skipped, instances[i].Addr) + unhealthy = append(unhealthy, instances[i].Addr) instances = append(instances[:i], instances[i+1:]...) } } @@ -58,17 +53,15 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati // after filtering out dead ones, don't even bother trying. if len(instances) < minSuccess { var err error + var unhealthyStr string + if len(unhealthy) > 0 { + unhealthyStr = fmt.Sprintf(" - unhealthy instances: %s", strings.Join(unhealthy, ",")) + } if zoneAwarenessEnabled { - level.Error(s.logger).Log("msg", - fmt.Sprintf("at least %d live replicas required across different availability zones, could only find %d", - minSuccess, len(instances)), "unhealthy", skipped) - err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(instances)) + err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d%s", minSuccess, len(instances), unhealthyStr) } else { - level.Error(s.logger).Log("msg", - fmt.Sprintf("at least %d live replicas required, could only find %d", - minSuccess, len(instances)), "unhealthy", skipped) - err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(instances)) + err = fmt.Errorf("at least %d live replicas required, could only find %d%s", minSuccess, len(instances), unhealthyStr) } return nil, 0, err @@ -77,31 +70,32 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati return instances, len(instances) - minSuccess, nil } -type ignoreUnhealthyInstancesReplicationStrategy struct { - logger log.Logger -} +type ignoreUnhealthyInstancesReplicationStrategy struct{} -func NewIgnoreUnhealthyInstancesReplicationStrategy(logger log.Logger) ReplicationStrategy { - return &ignoreUnhealthyInstancesReplicationStrategy{logger: logger} +func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy { + return &ignoreUnhealthyInstancesReplicationStrategy{} } func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []InstanceDesc, op Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []InstanceDesc, maxFailures int, err error) { now := time.Now() // Filter out unhealthy instances. - var skipped []string + var unhealthy []string for i := 0; i < len(instances); { if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { - skipped = append(skipped, instances[i].Addr) + unhealthy = append(unhealthy, instances[i].Addr) instances = append(instances[:i], instances[i+1:]...) } } // We need at least 1 healthy instance no matter what is the replication factor set to. if len(instances) == 0 { - level.Error(r.logger).Log("msg", "failed to find any healthy ring replicas", "unhealthy", skipped) - return nil, 0, errors.New("at least 1 healthy replica required, could only find 0") + var unhealthyStr string + if len(unhealthy) > 0 { + unhealthyStr = fmt.Sprintf(" - unhealthy instances: %s", strings.Join(unhealthy, ",")) + } + return nil, 0, fmt.Errorf("at least 1 healthy replica required, could only find 0%s", unhealthyStr) } return instances, len(instances) - 1, nil diff --git a/ring/replication_strategy_test.go b/ring/replication_strategy_test.go index 18e65c1eb..1fe5d0e91 100644 --- a/ring/replication_strategy_test.go +++ b/ring/replication_strategy_test.go @@ -5,7 +5,6 @@ import ( "testing" "time" - "github.com/go-kit/log" "github.com/stretchr/testify/assert" ) @@ -25,7 +24,7 @@ func TestRingReplicationStrategy(t *testing.T) { { replicationFactor: 1, deadIngesters: 1, - expectedError: "at least 1 live replicas required, could only find 0", + expectedError: "at least 1 live replicas required, could only find 0 - unhealthy instances: dead1", }, // Ensure it works for RF=3 and 2 ingesters. @@ -53,7 +52,7 @@ func TestRingReplicationStrategy(t *testing.T) { replicationFactor: 3, liveIngesters: 1, deadIngesters: 2, - expectedError: "at least 2 live replicas required, could only find 1", + expectedError: "at least 2 live replicas required, could only find 1 - unhealthy instances: dead1,dead2", }, // Ensure it works when adding / removing nodes. @@ -76,7 +75,7 @@ func TestRingReplicationStrategy(t *testing.T) { replicationFactor: 3, liveIngesters: 2, deadIngesters: 2, - expectedError: "at least 3 live replicas required, could only find 2", + expectedError: "at least 3 live replicas required, could only find 2 - unhealthy instances: dead1,dead2", }, } { ingesters := []InstanceDesc{} @@ -86,11 +85,11 @@ func TestRingReplicationStrategy(t *testing.T) { }) } for i := 0; i < tc.deadIngesters; i++ { - ingesters = append(ingesters, InstanceDesc{}) + ingesters = append(ingesters, InstanceDesc{Addr: fmt.Sprintf("dead%d", i+1)}) } t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - strategy := NewDefaultReplicationStrategy(log.NewNopLogger()) + strategy := NewDefaultReplicationStrategy() liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.replicationFactor, 100*time.Second, false) if tc.expectedError == "" { assert.NoError(t, err) @@ -138,7 +137,7 @@ func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) { liveIngesters: 0, deadIngesters: 3, expectedMaxFailure: 0, - expectedError: "at least 1 healthy replica required, could only find 0", + expectedError: "at least 1 healthy replica required, could only find 0 - unhealthy instances: dead1,dead2,dead3", }, } { ingesters := []InstanceDesc{} @@ -148,11 +147,11 @@ func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) { }) } for i := 0; i < tc.deadIngesters; i++ { - ingesters = append(ingesters, InstanceDesc{}) + ingesters = append(ingesters, InstanceDesc{Addr: fmt.Sprintf("dead%d", i+1)}) } t.Run(tc.name, func(t *testing.T) { - strategy := NewIgnoreUnhealthyInstancesReplicationStrategy(log.NewNopLogger()) + strategy := NewIgnoreUnhealthyInstancesReplicationStrategy() liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, 3, 100*time.Second, false) if tc.expectedError == "" { assert.NoError(t, err) diff --git a/ring/ring.go b/ring/ring.go index 4e5061f94..63e3a547c 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -221,7 +221,7 @@ func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registe return nil, err } - return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(logger), reg, logger) + return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), reg, logger) } func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error) { diff --git a/ring/ring_test.go b/ring/ring_test.go index 1225496ee..82193753c 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -55,7 +55,7 @@ func benchmarkBatch(b *testing.B, numInstances, numKeys int) { r := Ring{ cfg: cfg, ringDesc: desc, - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } ctx := context.Background() @@ -96,7 +96,7 @@ func TestDoBatchZeroInstances(t *testing.T) { r := Ring{ cfg: Config{}, ringDesc: desc, - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } require.Error(t, DoBatch(ctx, Write, &r, keys, callback, cleanup)) } @@ -187,7 +187,7 @@ func TestRing_Get_ZoneAwarenessWithIngesterLeaving(t *testing.T) { ringTokensByZone: r.getTokensByZone(), ringInstanceByToken: r.getTokensInfo(), ringZones: getZones(r.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } _, bufHosts, bufZones := MakeBuffersForGet() @@ -279,7 +279,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { ringTokensByZone: r.getTokensByZone(), ringInstanceByToken: r.getTokensInfo(), ringZones: getZones(r.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } instances := make([]InstanceDesc, 0, len(r.GetIngesters())) @@ -373,7 +373,7 @@ func TestRing_GetAllHealthy(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } set, err := ring.GetAllHealthy(Read) @@ -503,7 +503,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } set, err := ring.GetReplicationSetForOperation(Read) @@ -821,7 +821,7 @@ func TestRing_GetReplicationSetForOperation_WithZoneAwarenessEnabled(t *testing. ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Check the replication set has the correct settings @@ -957,7 +957,7 @@ func TestRing_ShuffleShard(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) @@ -1009,7 +1009,7 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } for i := 1; i <= numTenants; i++ { @@ -1077,7 +1077,7 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Compute the shard for each tenant. @@ -1176,7 +1176,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Compute the initial shard for each tenant. @@ -1240,7 +1240,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Get the replication set with shard size = 3. @@ -1317,7 +1317,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Get the replication set with shard size = 2. @@ -1576,7 +1576,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Replay the events on the timeline. @@ -1641,7 +1641,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // The simulation starts with the minimum shard size. Random events can later increase it. @@ -1794,7 +1794,7 @@ func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, s ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), lastTopologyChange: time.Now(), } @@ -1822,7 +1822,7 @@ func BenchmarkRing_Get(b *testing.B) { ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), lastTopologyChange: time.Now(), } @@ -1850,7 +1850,7 @@ func TestRing_Get_NoMemoryAllocations(t *testing.T) { ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), lastTopologyChange: time.Now(), } diff --git a/ring/util_test.go b/ring/util_test.go index 74b5ca723..9a4a69c69 100644 --- a/ring/util_test.go +++ b/ring/util_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -121,7 +120,7 @@ func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } startTime := time.Now() @@ -156,7 +155,7 @@ func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing. ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Add 1 new instance after some time. @@ -207,7 +206,7 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), + strategy: NewDefaultReplicationStrategy(), } // Keep changing the ring.