-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
query: cleanup store statuses as they come and go #910
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,12 +81,13 @@ type StoreSet struct { | |
dialOpts []grpc.DialOption | ||
gRPCInfoCallTimeout time.Duration | ||
|
||
mtx sync.RWMutex | ||
storesStatusesMtx sync.RWMutex | ||
stores map[string]*storeRef | ||
storeNodeConnections prometheus.Gauge | ||
externalLabelStores map[string]int | ||
storeStatuses map[string]*StoreStatus | ||
mtx sync.RWMutex | ||
storesStatusesMtx sync.RWMutex | ||
stores map[string]*storeRef | ||
storeNodeConnections prometheus.Gauge | ||
externalLabelStores map[string]int | ||
storeStatuses map[string]*StoreStatus | ||
unhealthyStoreTimeout time.Duration | ||
} | ||
|
||
type storeSetNodeCollector struct { | ||
|
@@ -118,6 +119,7 @@ func NewStoreSet( | |
reg *prometheus.Registry, | ||
storeSpecs func() []StoreSpec, | ||
dialOpts []grpc.DialOption, | ||
unhealthyStoreTimeout time.Duration, | ||
) *StoreSet { | ||
storeNodeConnections := prometheus.NewGauge(prometheus.GaugeOpts{ | ||
Name: "thanos_store_nodes_grpc_connections", | ||
|
@@ -135,14 +137,15 @@ func NewStoreSet( | |
} | ||
|
||
ss := &StoreSet{ | ||
logger: log.With(logger, "component", "storeset"), | ||
storeSpecs: storeSpecs, | ||
dialOpts: dialOpts, | ||
storeNodeConnections: storeNodeConnections, | ||
gRPCInfoCallTimeout: 10 * time.Second, | ||
externalLabelStores: map[string]int{}, | ||
stores: make(map[string]*storeRef), | ||
storeStatuses: make(map[string]*StoreStatus), | ||
logger: log.With(logger, "component", "storeset"), | ||
storeSpecs: storeSpecs, | ||
dialOpts: dialOpts, | ||
storeNodeConnections: storeNodeConnections, | ||
gRPCInfoCallTimeout: 10 * time.Second, | ||
externalLabelStores: map[string]int{}, | ||
stores: make(map[string]*storeRef), | ||
storeStatuses: make(map[string]*StoreStatus), | ||
unhealthyStoreTimeout: unhealthyStoreTimeout, | ||
} | ||
|
||
storeNodeCollector := &storeSetNodeCollector{externalLabelOccurrences: ss.externalLabelOccurrences} | ||
|
@@ -255,6 +258,7 @@ func (s *StoreSet) Update(ctx context.Context) { | |
} | ||
s.externalLabelStores = externalLabelStores | ||
s.storeNodeConnections.Set(float64(len(s.stores))) | ||
s.cleanUpStoreStatuses() | ||
} | ||
|
||
func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef { | ||
|
@@ -345,16 +349,23 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) { | |
s.storesStatusesMtx.Lock() | ||
defer s.storesStatusesMtx.Unlock() | ||
|
||
now := time.Now() | ||
s.storeStatuses[store.addr] = &StoreStatus{ | ||
Name: store.addr, | ||
LastError: err, | ||
LastCheck: now, | ||
Labels: store.labels, | ||
StoreType: store.storeType, | ||
MinTime: store.minTime, | ||
MaxTime: store.maxTime, | ||
status := StoreStatus{Name: store.addr} | ||
prev, ok := s.storeStatuses[store.addr] | ||
if ok { | ||
status = *prev | ||
} | ||
|
||
status.LastError = err | ||
status.LastCheck = time.Now() | ||
|
||
if err == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is quite weird to me. Essentially not consistent. What if
That's kind of inconsistent? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might not be the more explicit way of doing things, didn't want to add too many cases. Here's my logic:
Does that make sense ? For the second point, not sure I follow you There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was talking about consistent error handling. You have two basic cases. 1 as talked through I think I am missing the fact that 2 does not exists because it was never healthy ;p Ignore me then. |
||
status.Labels = store.labels | ||
status.StoreType = store.storeType | ||
status.MinTime = store.minTime | ||
status.MaxTime = store.maxTime | ||
} | ||
|
||
s.storeStatuses[store.addr] = &status | ||
} | ||
|
||
func (s *StoreSet) GetStoreStatus() []StoreStatus { | ||
|
@@ -401,3 +412,17 @@ func (s *StoreSet) Close() { | |
st.close() | ||
} | ||
} | ||
|
||
func (s *StoreSet) cleanUpStoreStatuses() { | ||
s.storesStatusesMtx.Lock() | ||
defer s.storesStatusesMtx.Unlock() | ||
|
||
now := time.Now() | ||
for addr, status := range s.storeStatuses { | ||
if _, ok := s.stores[addr]; !ok { | ||
if now.Sub(status.LastCheck) >= s.unhealthyStoreTimeout { | ||
delete(s.storeStatuses, addr) | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's move to
sync.Mutex
if we don use R and W separation (:There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes !