diff --git a/CHANGELOG.md b/CHANGELOG.md index 617f9c97ec..2a26715c0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +- [#7978](https://github.com/thanos-io/thanos/pull/7978) Receive: Fix deadlock during local writes when `split-tenant-label-name` is used + ### Added - [#7907](https://github.com/thanos-io/thanos/pull/7907) Receive: Add `--receive.grpc-service-config` flag to configure gRPC service config for the receivers. diff --git a/docs/components/receive.md b/docs/components/receive.md index 38906489ba..fc0a64d98b 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -331,7 +331,7 @@ Please see the metric `thanos_receive_forward_delay_seconds` to see if you need The following formula is used for calculating quorum: -```go mdox-exec="sed -n '1012,1022p' pkg/receive/handler.go" +```go mdox-exec="sed -n '1015,1025p' pkg/receive/handler.go" // writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. func (h *Handler) writeQuorum() int { // NOTE(GiedriusS): this is here because otherwise RF=2 doesn't make sense as all writes diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 163a29a2e1..86c90c30fa 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -765,7 +765,10 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) ( // Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go // asynchronously and with this capacity we will never block on writing to the channel. - maxBufferedResponses := len(localWrites) + var maxBufferedResponses int + for er := range localWrites { + maxBufferedResponses += len(localWrites[er]) + } for er := range remoteWrites { maxBufferedResponses += len(remoteWrites[er]) } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 408e0869fb..f3a58e128f 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -1820,6 +1820,57 @@ func TestDistributeSeries(t *testing.T) { require.Equal(t, map[string]struct{}{"bar": {}, "boo": {}}, hr.seenTenants) } +func TestHandlerSplitTenantLabelLocalWrite(t *testing.T) { + const tenantIDLabelName = "thanos_tenant_id" + + appendable := &fakeAppendable{ + appender: newFakeAppender(nil, nil, nil), + } + + h := NewHandler(nil, &Options{ + Endpoint: "localhost", + SplitTenantLabelName: tenantIDLabelName, + ReceiverMode: RouterIngestor, + ReplicationFactor: 1, + ForwardTimeout: 1 * time.Second, + Writer: NewWriter( + log.NewNopLogger(), + newFakeTenantAppendable(appendable), + &WriterOptions{}, + ), + }) + + // initialize hashring with a single local endpoint matching the handler endpoint to force + // using local write + hashring, err := newSimpleHashring([]Endpoint{ + { + Address: h.options.Endpoint, + }, + }) + require.NoError(t, err) + hr := &hashringSeenTenants{Hashring: hashring} + h.Hashring(hr) + + response, err := h.RemoteWrite(context.Background(), &storepb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: labelpb.ZLabelsFromPromLabels( + labels.FromStrings("a", "b", tenantIDLabelName, "bar"), + ), + }, + { + Labels: labelpb.ZLabelsFromPromLabels( + labels.FromStrings("b", "a", tenantIDLabelName, "foo"), + ), + }, + }, + }) + + require.NoError(t, err) + require.NotNil(t, response) + require.Equal(t, map[string]struct{}{"bar": {}, "foo": {}}, hr.seenTenants) +} + func TestHandlerFlippingHashrings(t *testing.T) { t.Parallel()