diff --git a/ring/replication_strategy.go b/ring/replication_strategy.go index e572cb77a..4ff6f86ae 100644 --- a/ring/replication_strategy.go +++ b/ring/replication_strategy.go @@ -4,6 +4,8 @@ import ( "fmt" "time" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" ) @@ -14,10 +16,12 @@ type ReplicationStrategy interface { Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error) } -type defaultReplicationStrategy struct{} +type defaultReplicationStrategy struct { + logger log.Logger +} -func NewDefaultReplicationStrategy() ReplicationStrategy { - return &defaultReplicationStrategy{} +func NewDefaultReplicationStrategy(logger log.Logger) ReplicationStrategy { + return &defaultReplicationStrategy{logger: logger} } // Filter decides, given the set of instances eligible for a key, @@ -40,10 +44,12 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati // Skip those that have not heartbeated in a while. NB these are still // included in the calculation of minSuccess, so if too many failed instances // will cause the whole write to fail. + var skipped []string for i := 0; i < len(instances); { if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { + skipped = append(skipped, instances[i].Addr) instances = append(instances[:i], instances[i+1:]...) } } @@ -54,8 +60,14 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati var err error if zoneAwarenessEnabled { + level.Error(s.logger).Log("msg", + fmt.Sprintf("at least %d live replicas required across different availability zones, could only find %d", + minSuccess, len(instances)), "unhealthy", skipped) err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(instances)) } else { + level.Error(s.logger).Log("msg", + fmt.Sprintf("at least %d live replicas required, could only find %d", + minSuccess, len(instances)), "unhealthy", skipped) err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(instances)) } @@ -65,25 +77,30 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati return instances, len(instances) - minSuccess, nil } -type ignoreUnhealthyInstancesReplicationStrategy struct{} +type ignoreUnhealthyInstancesReplicationStrategy struct { + logger log.Logger +} -func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy { - return &ignoreUnhealthyInstancesReplicationStrategy{} +func NewIgnoreUnhealthyInstancesReplicationStrategy(logger log.Logger) ReplicationStrategy { + return &ignoreUnhealthyInstancesReplicationStrategy{logger: logger} } func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []InstanceDesc, op Operation, _ int, heartbeatTimeout time.Duration, _ bool) (healthy []InstanceDesc, maxFailures int, err error) { now := time.Now() // Filter out unhealthy instances. + var skipped []string for i := 0; i < len(instances); { if instances[i].IsHealthy(op, heartbeatTimeout, now) { i++ } else { + skipped = append(skipped, instances[i].Addr) instances = append(instances[:i], instances[i+1:]...) } } // We need at least 1 healthy instance no matter what is the replication factor set to. if len(instances) == 0 { + level.Error(r.logger).Log("msg", "failed to find any healthy ring replicas", "unhealthy", skipped) return nil, 0, errors.New("at least 1 healthy replica required, could only find 0") } diff --git a/ring/replication_strategy_test.go b/ring/replication_strategy_test.go index 0bce73350..18e65c1eb 100644 --- a/ring/replication_strategy_test.go +++ b/ring/replication_strategy_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/stretchr/testify/assert" ) @@ -89,7 +90,7 @@ func TestRingReplicationStrategy(t *testing.T) { } t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { - strategy := NewDefaultReplicationStrategy() + strategy := NewDefaultReplicationStrategy(log.NewNopLogger()) liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, tc.replicationFactor, 100*time.Second, false) if tc.expectedError == "" { assert.NoError(t, err) @@ -151,7 +152,7 @@ func TestIgnoreUnhealthyInstancesReplicationStrategy(t *testing.T) { } t.Run(tc.name, func(t *testing.T) { - strategy := NewIgnoreUnhealthyInstancesReplicationStrategy() + strategy := NewIgnoreUnhealthyInstancesReplicationStrategy(log.NewNopLogger()) liveIngesters, maxFailure, err := strategy.Filter(ingesters, Read, 3, 100*time.Second, false) if tc.expectedError == "" { assert.NoError(t, err) diff --git a/ring/ring.go b/ring/ring.go index 63e3a547c..4e5061f94 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -221,7 +221,7 @@ func New(cfg Config, name, key string, logger log.Logger, reg prometheus.Registe return nil, err } - return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(), reg, logger) + return NewWithStoreClientAndStrategy(cfg, name, key, store, NewDefaultReplicationStrategy(logger), reg, logger) } func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client, strategy ReplicationStrategy, reg prometheus.Registerer, logger log.Logger) (*Ring, error) { diff --git a/ring/ring_test.go b/ring/ring_test.go index 82193753c..1225496ee 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -55,7 +55,7 @@ func benchmarkBatch(b *testing.B, numInstances, numKeys int) { r := Ring{ cfg: cfg, ringDesc: desc, - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } ctx := context.Background() @@ -96,7 +96,7 @@ func TestDoBatchZeroInstances(t *testing.T) { r := Ring{ cfg: Config{}, ringDesc: desc, - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } require.Error(t, DoBatch(ctx, Write, &r, keys, callback, cleanup)) } @@ -187,7 +187,7 @@ func TestRing_Get_ZoneAwarenessWithIngesterLeaving(t *testing.T) { ringTokensByZone: r.getTokensByZone(), ringInstanceByToken: r.getTokensInfo(), ringZones: getZones(r.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } _, bufHosts, bufZones := MakeBuffersForGet() @@ -279,7 +279,7 @@ func TestRing_Get_ZoneAwareness(t *testing.T) { ringTokensByZone: r.getTokensByZone(), ringInstanceByToken: r.getTokensInfo(), ringZones: getZones(r.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } instances := make([]InstanceDesc, 0, len(r.GetIngesters())) @@ -373,7 +373,7 @@ func TestRing_GetAllHealthy(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } set, err := ring.GetAllHealthy(Read) @@ -503,7 +503,7 @@ func TestRing_GetReplicationSetForOperation(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } set, err := ring.GetReplicationSetForOperation(Read) @@ -821,7 +821,7 @@ func TestRing_GetReplicationSetForOperation_WithZoneAwarenessEnabled(t *testing. ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Check the replication set has the correct settings @@ -957,7 +957,7 @@ func TestRing_ShuffleShard(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) @@ -1009,7 +1009,7 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } for i := 1; i <= numTenants; i++ { @@ -1077,7 +1077,7 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Compute the shard for each tenant. @@ -1176,7 +1176,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Compute the initial shard for each tenant. @@ -1240,7 +1240,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Get the replication set with shard size = 3. @@ -1317,7 +1317,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Get the replication set with shard size = 2. @@ -1576,7 +1576,7 @@ func TestRing_ShuffleShardWithLookback(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Replay the events on the timeline. @@ -1641,7 +1641,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // The simulation starts with the minimum shard size. Random events can later increase it. @@ -1794,7 +1794,7 @@ func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, s ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), lastTopologyChange: time.Now(), } @@ -1822,7 +1822,7 @@ func BenchmarkRing_Get(b *testing.B) { ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), lastTopologyChange: time.Now(), } @@ -1850,7 +1850,7 @@ func TestRing_Get_NoMemoryAllocations(t *testing.T) { ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), shuffledSubringCache: map[subringCacheKey]*Ring{}, - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), lastTopologyChange: time.Now(), } diff --git a/ring/util_test.go b/ring/util_test.go index 9a4a69c69..74b5ca723 100644 --- a/ring/util_test.go +++ b/ring/util_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/go-kit/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" @@ -120,7 +121,7 @@ func TestWaitRingStabilityShouldReturnAsSoonAsMinStabilityIsReachedOnNoChanges(t ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } startTime := time.Now() @@ -155,7 +156,7 @@ func TestWaitRingStabilityShouldReturnOnceMinStabilityHasBeenReached(t *testing. ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Add 1 new instance after some time. @@ -206,7 +207,7 @@ func TestWaitRingStabilityShouldReturnErrorIfMaxWaitingIsReached(t *testing.T) { ringTokensByZone: ringDesc.getTokensByZone(), ringInstanceByToken: ringDesc.getTokensInfo(), ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), + strategy: NewDefaultReplicationStrategy(log.NewNopLogger()), } // Keep changing the ring.