Skip to content

Commit

Permalink
receive/handler: implement tenant label splitting (thanos-io#7256)
Browse files Browse the repository at this point in the history
* receive/handler: implement tenant label splitting

Implement splitting incoming HTTP requests along some label inside of
the timeseries themselves. This functionality is useful when you have
one big application exposing lots of series and, for instance, you have
a label `team` that identifies different owners of metrics in that
application. Then using this you can use that `team` label to have
different tenants in Thanos.

Only negative thing that I could spot is that if after splitting one of
the requests fails then that code is used for all tenants and that skews
the Receiver metrics a little bit. I think that can be left as a TODO
task.

Signed-off-by: Giedrius Statkevičius <[email protected]>

* test/e2e: add more receiver tests

Signed-off-by: Giedrius Statkevičius <[email protected]>

* thanos/receive: note that splitting takes precendence over HTTP

Signed-off-by: Giedrius Statkevičius <[email protected]>

* thanos/receive: fix typo

Signed-off-by: Giedrius Statkevičius <[email protected]>

---------

Signed-off-by: Giedrius Statkevičius <[email protected]>
Signed-off-by: Giedrius Statkevičius <[email protected]>
  • Loading branch information
GiedriusS authored and jnyi committed Oct 16, 2024
1 parent 6b93483 commit 3b207cc
Showing 1 changed file with 23 additions and 21 deletions.
44 changes: 23 additions & 21 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,29 +711,31 @@ type remoteWriteParams struct {
alreadyReplicated bool
}

func (h *Handler) gatherWriteStats(remoteWrites map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
func (h *Handler) gatherWriteStats(writes ...map[endpointReplica]map[string]trackedSeries) tenantRequestStats {
var stats tenantRequestStats = make(tenantRequestStats)

for er := range remoteWrites {
if er.replica != 0 {
continue // Skip replicated writes, only count once.
}
for tenant, series := range remoteWrites[er] {
samples := 0
for _, write := range writes {
for er := range write {
if er.replica != 0 {
continue // Skip replicated writes, only count once.
}
for tenant, series := range write[er] {
samples := 0

for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}
for _, ts := range series.timeSeries {
samples += len(ts.Samples)
}

if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples
if st, ok := stats[tenant]; ok {
st.timeseries += len(series.timeSeries)
st.totalSamples += samples

stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
stats[tenant] = st
} else {
stats[tenant] = requestStats{
timeseries: len(series.timeSeries),
totalSamples: samples,
}
}
}
}
Expand All @@ -746,6 +748,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
ctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), ctx), h.options.ForwardTimeout)

var writeErrors writeErrors
var stats tenantRequestStats = make(tenantRequestStats)

defer func() {
if writeErrors.ErrOrNil() != nil {
Expand All @@ -765,11 +768,10 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) (
localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries)
if err != nil {
level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err)
return tenantRequestStats{}, err
return stats, err
}

// Specific to Databricks setup, we only measure remote writes
stats := h.gatherWriteStats(remoteWrites)
stats = h.gatherWriteStats(localWrites, remoteWrites)

// Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go
// asynchronously and with this capacity we will never block on writing to the channel.
Expand Down

0 comments on commit 3b207cc

Please sign in to comment.