Skip to content

Commit

Permalink
remove code smells
Browse files Browse the repository at this point in the history
  • Loading branch information
hitanshu-mehta committed Aug 18, 2021
1 parent fd90078 commit 8e23c4a
Showing 1 changed file with 54 additions and 40 deletions.
94 changes: 54 additions & 40 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,24 +48,24 @@ type EndpointSpec interface {
// given store connection.
Metadata(ctx context.Context, client infopb.InfoClient) (*endpointMetadata, error)

// StrictStatic returns true if the endpoint has been statically defined and it is under a strict mode.
StrictStatic() bool
// IsStrictStatic returns true if the endpoint has been statically defined and it is under a strict mode.
IsStrictStatic() bool
}

type grpcEndpointSpec struct {
addr string
strictstatic bool
addr string
isStrictStatic bool
}

// NewGRPCEndpointSpec creates gRPC endpoint spec.
// It uses InfoAPI to get Metadata.
func NewGRPCEndpointSpec(addr string, strictstatic bool) EndpointSpec {
return &grpcEndpointSpec{addr: addr, strictstatic: strictstatic}
func NewGRPCEndpointSpec(addr string, isStrictStatic bool) EndpointSpec {
return &grpcEndpointSpec{addr: addr, isStrictStatic: isStrictStatic}
}

// StrictStatic returns true if the endpoint has been statically defined and it is under a strict mode.
func (es *grpcEndpointSpec) StrictStatic() bool {
return es.strictstatic
// IsStrictStatic returns true if the endpoint has been statically defined and it is under a strict mode.
func (es *grpcEndpointSpec) IsStrictStatic() bool {
return es.isStrictStatic
}

func (es *grpcEndpointSpec) Addr() string {
Expand Down Expand Up @@ -262,17 +262,16 @@ func (e *EndpointSet) Update(ctx context.Context) {
level.Info(er.logger).Log("msg", unhealthyEndpointMessage, "address", addr, "extLset", labelpb.PromLabelSetsToString(er.LabelSets()))
}

// Add stores that are not yet in stores.
// Add endpoints that are not yet in activeEndpoints map.
for addr, er := range activeEndpoints {
if _, ok := endpoints[addr]; ok {
continue
}

extLset := labelpb.PromLabelSetsToString(er.LabelSets())

// All producers should have unique external labels. While this does not check only StoreAPIs connected to
// this querier this allows to notify early user about misconfiguration. Warn only. This is also detectable from metric.
if (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) &&
// All producers that expose StoreAPI should have unique external labels. Check all which connect to our Querier.
if er.HasStoreAPI() && (er.ComponentType() == component.Sidecar || er.ComponentType() == component.Rule) &&
stats[component.Sidecar][extLset]+stats[component.Rule][extLset] > 0 {

level.Warn(e.logger).Log("msg", "found duplicate storeAPI producer (sidecar or ruler). This is not advices as it will malform data in in the same bucket",
Expand All @@ -283,25 +282,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
endpoints[addr] = er
e.updateEndpointStatus(er, nil)

if er.HasStoreAPI() {
level.Info(e.logger).Log("msg", "adding new storeAPI to query endpointset", "address", addr, "extLset", extLset)
}

if er.HasRulesAPI() {
level.Info(e.logger).Log("msg", "adding new rulesAPI to query endpointset", "address", addr)
}

if er.HasExemplarsAPI() {
level.Info(e.logger).Log("msg", "adding new exemplarsAPI to query endpointset", "address", addr)
}

if er.HasTargetsAPI() {
level.Info(e.logger).Log("msg", "adding new targetsAPI to query endpointset", "address", addr)
}

if er.HasMetricMetadataAPI() {
level.Info(e.logger).Log("msg", "adding new MetricMetadataAPI to query endpointset", "address", addr)
}
level.Info(e.logger).Log("msg", fmt.Sprintf("adding new %v with %+v", er.ComponentType(), er.apisPresent()), "address", addr, "extLset", extLset)
}

e.endpointsMetric.Update(stats)
Expand All @@ -312,7 +293,7 @@ func (e *EndpointSet) Update(ctx context.Context) {
e.cleanUpStoreStatuses(endpoints)
}

// Get returns a list of all active stores.
// GetStoreClients returns a list of all active stores.
func (e *EndpointSet) GetStoreClients() []storepb.StoreClient {
e.endpointsMtx.RLock()
defer e.endpointsMtx.RUnlock()
Expand Down Expand Up @@ -443,7 +424,7 @@ func (e *EndpointSet) getActiveEndpoints(ctx context.Context, endpoints map[stri

metadata, err := spec.Metadata(ctx, er.clients.info)
if err != nil {
if !seenAlready && !spec.StrictStatic() {
if !seenAlready && !spec.IsStrictStatic() {
// Close only if new and not a strict static node.
// Unactive `e.endpoints` will be closed later on.
er.Close()
Expand All @@ -452,7 +433,7 @@ func (e *EndpointSet) getActiveEndpoints(ctx context.Context, endpoints map[stri
e.updateEndpointStatus(er, err)
level.Warn(e.logger).Log("msg", "update of node failed", "err", errors.Wrap(err, "getting metadata"), "address", addr)

if !spec.StrictStatic() {
if !spec.IsStrictStatic() {
return
}

Expand Down Expand Up @@ -600,39 +581,46 @@ func (er *endpointRef) ComponentType() component.Component {
return component.FromString(er.metadata.ComponentType)
}

func (er *endpointRef) HasClients() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

return er.clients != nil
}

func (er *endpointRef) HasStoreAPI() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

return er.clients.store != nil
return er.HasClients() && er.clients.store != nil
}

func (er *endpointRef) HasRulesAPI() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

return er.clients.rule != nil
return er.HasClients() && er.clients.rule != nil
}

func (er *endpointRef) HasTargetsAPI() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

return er.clients.target != nil
return er.HasClients() && er.clients.target != nil
}

func (er *endpointRef) HasMetricMetadataAPI() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

return er.clients.metricMetadata != nil
return er.HasClients() && er.clients.metricMetadata != nil
}

func (er *endpointRef) HasExemplarsAPI() bool {
er.mtx.RLock()
defer er.mtx.RUnlock()

return er.clients.exemplar != nil
return er.HasClients() && er.clients.exemplar != nil
}

func (er *endpointRef) LabelSets() []labels.Labels {
Expand Down Expand Up @@ -682,6 +670,32 @@ func (er *endpointRef) Close() {
runutil.CloseWithLogOnErr(er.logger, er.cc, fmt.Sprintf("endpoint %v connection closed", er.addr))
}

func (er *endpointRef) apisPresent() []string {
var apisPresent []string

if er.HasStoreAPI() {
apisPresent = append(apisPresent, "storeAPI")
}

if er.HasRulesAPI() {
apisPresent = append(apisPresent, "rulesAPI")
}

if er.HasExemplarsAPI() {
apisPresent = append(apisPresent, "exemplarsAPI")
}

if er.HasTargetsAPI() {
apisPresent = append(apisPresent, "targetsAPI")
}

if er.HasMetricMetadataAPI() {
apisPresent = append(apisPresent, "MetricMetadataAPI")
}

return apisPresent
}

type endpointClients struct {
store storepb.StoreClient
rule rulespb.RulesClient
Expand Down

0 comments on commit 8e23c4a

Please sign in to comment.