Skip to content

Commit

Permalink
Simplify code from review comments
Browse files Browse the repository at this point in the history
Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama committed Feb 7, 2023
1 parent 04be313 commit 6731606
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 20 deletions.
20 changes: 8 additions & 12 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -906,7 +906,6 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {
validatedMetadata := 0
validatedSamples := 0
validatedExemplars := 0
validatedHistograms := 0

// Find the earliest and latest samples in the batch.
earliestSampleTimestampMs, latestSampleTimestampMs := int64(math.MaxInt64), int64(0)
Expand Down Expand Up @@ -959,9 +958,8 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {
continue
}

validatedSamples += len(ts.Samples)
validatedSamples += len(ts.Samples) + len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
validatedHistograms += len(ts.Histograms)
}
if len(removeIndexes) > 0 {
for _, removeIndex := range removeIndexes {
Expand Down Expand Up @@ -989,14 +987,14 @@ func (d *Distributor) prePushValidationMiddleware(next push.Func) push.Func {
req.Metadata = util.RemoveSliceIndexes(req.Metadata, removeIndexes)
}

if validatedSamples == 0 && validatedHistograms == 0 && validatedMetadata == 0 {
if validatedSamples == 0 && validatedMetadata == 0 {
return &mimirpb.WriteResponse{}, firstPartialErr
}

totalN := validatedSamples + validatedExemplars + validatedHistograms + validatedMetadata
totalN := validatedSamples + validatedExemplars + validatedMetadata
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
if validatedSamples+validatedHistograms > 0 {
d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples + validatedHistograms))
if validatedSamples > 0 {
d.discardedSamplesRateLimited.WithLabelValues(userID, group).Add(float64(validatedSamples))
}
if validatedExemplars > 0 {
d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars))
Expand Down Expand Up @@ -1145,16 +1143,14 @@ func (d *Distributor) metricsMiddleware(next push.Func) push.Func {

numSamples := 0
numExemplars := 0
numHistograms := 0
for _, ts := range req.Timeseries {
numSamples += len(ts.Samples)
numSamples += len(ts.Samples) + len(ts.Histograms)
numExemplars += len(ts.Exemplars)
numHistograms += len(ts.Histograms)
}

d.incomingRequests.WithLabelValues(userID).Inc()
if numSamples+numHistograms > 0 {
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples + numHistograms))
if numSamples > 0 {
d.incomingSamples.WithLabelValues(userID).Add(float64(numSamples))
}
if numExemplars > 0 {
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
Expand Down
8 changes: 2 additions & 6 deletions pkg/distributor/forwarding/forwarding.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,7 @@ func (r *request) doHTTP(ctx context.Context, body []byte) error {
httpReq.Header.Set(user.OrgIDHeaderName, r.user)

r.requests.Inc()
if r.counts.SampleCount > 0 {
r.samples.Add(float64(r.counts.SampleCount))
}
r.samples.Add(float64(r.counts.SampleCount))
r.exemplars.Add(float64(r.counts.ExemplarCount))

beforeTs := time.Now()
Expand Down Expand Up @@ -594,9 +592,7 @@ func (r *request) doHTTPGrpc(ctx context.Context, body []byte) error {
h := c.(httpgrpc.HTTPClient)

r.requests.Inc()
if r.counts.SampleCount > 0 {
r.samples.Add(float64(r.counts.SampleCount))
}
r.samples.Add(float64(r.counts.SampleCount))
r.exemplars.Add(float64(r.counts.ExemplarCount))

beforeTs := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions pkg/distributor/forwarding/forwarding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestForwardingSamplesSuccessfullyToSingleTarget(t *testing.T) {
cortex_distributor_forward_requests_total{} 1
# HELP cortex_distributor_forward_samples_total The total number of samples the Distributor forwarded.
# TYPE cortex_distributor_forward_samples_total counter
cortex_distributor_forward_samples_total 4
cortex_distributor_forward_samples_total{} 4
`

require.NoError(t, testutil.GatherAndCompare(
Expand Down Expand Up @@ -926,7 +926,7 @@ func TestForwardingToHTTPGrpcTarget(t *testing.T) {
cortex_distributor_forward_requests_total{} 1
# HELP cortex_distributor_forward_samples_total The total number of samples the Distributor forwarded.
# TYPE cortex_distributor_forward_samples_total counter
cortex_distributor_forward_samples_total 4
cortex_distributor_forward_samples_total{} 4
# TYPE cortex_distributor_forward_grpc_clients gauge
# HELP cortex_distributor_forward_grpc_clients Number of gRPC clients used by Distributor forwarder.
cortex_distributor_forward_grpc_clients 1
Expand Down

0 comments on commit 6731606

Please sign in to comment.