diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b7d6cd294e..8c6ef219b50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ * [CHANGE] Query-frontend: Remove deprecated `frontend.align_queries_with_step` YAML configuration. The configuration option has been moved to per-tenant and default `limits` since Mimir 2.12. #8733 #8735 * [CHANGE] Store-gateway: Change default of `-blocks-storage.bucket-store.max-concurrent` to 200. #8768 * [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671 -* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 +* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750 * What it is: * When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path. * New configuration options: diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index b7dc88ac2ea..f40a9a62e04 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -6643,7 +6643,7 @@ "kind": "field", "name": "producer_max_buffered_bytes", "required": false, - "desc": "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit.", + "desc": "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.", "fieldValue": null, "fieldDefaultValue": 1073741824, "fieldFlag": "ingest-storage.kafka.producer-max-buffered-bytes", diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index 0a24955f120..e2ad3e303e6 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -1342,7 +1342,7 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.producer-max-buffered-bytes int - The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit. (default 1073741824) + The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) -ingest-storage.kafka.target-consumer-lag-at-startup duration diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index f1d0db3ddc9..703ad5b7b5c 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -416,7 +416,7 @@ Usage of ./cmd/mimir/mimir: -ingest-storage.kafka.max-consumer-lag-at-startup duration The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s) -ingest-storage.kafka.producer-max-buffered-bytes int - The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit. (default 1073741824) + The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824) -ingest-storage.kafka.producer-max-record-size-bytes int The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616) -ingest-storage.kafka.target-consumer-lag-at-startup duration diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 8e3a44890c7..23492cd634a 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3827,7 +3827,7 @@ kafka: # The maximum size of (uncompressed) buffered and unacknowledged produced # records sent to Kafka. The produce request fails once this limit is reached. - # 0 to disable the limit. + # This limit is per Kafka client. 0 to disable the limit. # CLI flag: -ingest-storage.kafka.producer-max-buffered-bytes [producer_max_buffered_bytes: | default = 1073741824] diff --git a/pkg/storage/ingest/config.go b/pkg/storage/ingest/config.go index f22537cf60d..2ea46e57c1b 100644 --- a/pkg/storage/ingest/config.go +++ b/pkg/storage/ingest/config.go @@ -85,8 +85,8 @@ type KafkaConfig struct { AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"` AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"` - ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` - ProducerMaxBufferedBytes int `yaml:"producer_max_buffered_bytes"` + ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"` + ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"` WaitStrongReadConsistencyTimeout time.Duration `yaml:"wait_strong_read_consistency_timeout"` @@ -123,7 +123,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.") f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.") - f.IntVar(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit.") + f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.") f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.") } diff --git a/pkg/storage/ingest/writer.go b/pkg/storage/ingest/writer.go index e162275ac2e..b01f8ca0017 100644 --- a/pkg/storage/ingest/writer.go +++ b/pkg/storage/ingest/writer.go @@ -5,6 +5,7 @@ package ingest import ( "context" "fmt" + "strconv" "sync" "time" @@ -15,7 +16,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" - "go.uber.org/atomic" "github.com/grafana/mimir/pkg/mimirpb" "github.com/grafana/mimir/pkg/util/globalerror" @@ -37,6 +37,8 @@ const ( // in the worst case scenario, which is expected to be way above the actual one. maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384 minProducerRecordDataBytesLimit = 1024 * 1024 + + writerMetricsPrefix = "cortex_ingest_storage_writer_" ) var ( @@ -59,18 +61,12 @@ type Writer struct { // We support multiple Kafka clients to better parallelize the workload. The number of // clients is fixed during the Writer lifecycle, but they're initialised lazily. writersMx sync.RWMutex - writers []*kafkaWriterClient - - // Keep track of Kafka records size (bytes) currently in-flight in the Kafka client. - // This counter is used to implement a limit on the max buffered bytes. - writersBufferedBytes *atomic.Int64 + writers []*KafkaProducer // Metrics. - writeRequestsTotal prometheus.Counter - writeFailuresTotal *prometheus.CounterVec - writeLatency prometheus.Histogram - writeBytesTotal prometheus.Counter - recordsPerRequest prometheus.Histogram + writeLatency prometheus.Histogram + writeBytesTotal prometheus.Counter + recordsPerRequest prometheus.Histogram // The following settings can only be overridden in tests. maxInflightProduceRequests int @@ -81,19 +77,10 @@ func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registere kafkaCfg: kafkaCfg, logger: logger, registerer: reg, - writers: make([]*kafkaWriterClient, kafkaCfg.WriteClients), + writers: make([]*KafkaProducer, kafkaCfg.WriteClients), maxInflightProduceRequests: 20, - writersBufferedBytes: atomic.NewInt64(0), // Metrics. - writeRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_writer_produce_requests_total", - Help: "Total number of produce requests issued to Kafka.", - }), - writeFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingest_storage_writer_produce_failures_total", - Help: "Total number of failed produce requests issued to Kafka.", - }, []string{"reason"}), writeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingest_storage_writer_latency_seconds", Help: "Latency to write an incoming request to the ingest storage.", @@ -168,7 +155,7 @@ func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string // visibility over this metric if records are rejected by Kafka because of MESSAGE_TOO_LARGE). w.recordsPerRequest.Observe(float64(len(records))) - res := w.produceSync(ctx, writer, records) + res := writer.ProduceSync(ctx, records) // Track latency only for successfully written records. if count, sizeBytes := successfulProduceRecordsStats(res); count > 0 { @@ -187,69 +174,7 @@ func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string return nil } -// produceSync produces records to Kafka and returns once all records have been successfully committed, -// or an error occurred. -func (w *Writer) produceSync(ctx context.Context, client *kafkaWriterClient, records []*kgo.Record) kgo.ProduceResults { - var ( - remaining = atomic.NewInt64(int64(len(records))) - done = make(chan struct{}) - resMx sync.Mutex - res = make(kgo.ProduceResults, 0, len(records)) - maxBufferedBytes = int64(w.kafkaCfg.ProducerMaxBufferedBytes) - ) - - w.writeRequestsTotal.Add(float64(len(records))) - - onProduceDone := func(r *kgo.Record, err error) { - if maxBufferedBytes > 0 { - w.writersBufferedBytes.Add(-int64(len(r.Value))) - } - - resMx.Lock() - res = append(res, kgo.ProduceResult{Record: r, Err: err}) - resMx.Unlock() - - if err != nil { - w.writeFailuresTotal.WithLabelValues(produceErrReason(err)).Inc() - } - - // In case of error we'll wait for all responses anyway before returning from produceSync(). - // It allows us to keep code easier, given we don't expect this function to be frequently - // called with multiple records. - if remaining.Dec() == 0 { - close(done) - } - } - - for _, record := range records { - // Fast fail if the Kafka client buffer is full. Buffered bytes counter is decreased onProducerDone(). - if maxBufferedBytes > 0 && w.writersBufferedBytes.Add(int64(len(record.Value))) > maxBufferedBytes { - onProduceDone(record, kgo.ErrMaxBuffered) - continue - } - - // We use a new context to avoid that other Produce() may be cancelled when this call's context is - // canceled. It's important to note that cancelling the context passed to Produce() doesn't actually - // prevent the data to be sent over the wire (because it's never removed from the buffer) but in some - // cases may cause all requests to fail with context cancelled. - // - // Produce() may theoretically block if the buffer is full, but we configure the Kafka client with - // unlimited buffer because we implement the buffer limit ourselves (see maxBufferedBytes). This means - // Produce() should never block for us in practice. - client.Produce(context.WithoutCancel(ctx), record, onProduceDone) - } - - // Wait for a response or until the context has done. - select { - case <-ctx.Done(): - return kgo.ProduceResults{{Err: context.Cause(ctx)}} - case <-done: - // Once we're done, it's guaranteed that no more results will be appended, so we can safely return it. - return res - } -} - -func (w *Writer) getKafkaWriterForPartition(partitionID int32) (*kafkaWriterClient, error) { +func (w *Writer) getKafkaWriterForPartition(partitionID int32) (*KafkaProducer, error) { // Check if the writer has already been created. w.writersMx.RLock() clientID := int(partitionID) % len(w.writers) @@ -268,10 +193,21 @@ func (w *Writer) getKafkaWriterForPartition(partitionID int32) (*kafkaWriterClie if writer != nil { return writer, nil } - newWriter, err := newKafkaWriterClient(clientID, w.kafkaCfg, w.maxInflightProduceRequests, w.logger, w.registerer) + + // Add the client ID to metrics so that they don't clash when the Writer is configured + // to run with multiple Kafka clients. + clientReg := prometheus.WrapRegistererWithPrefix(writerMetricsPrefix, + prometheus.WrapRegistererWith(prometheus.Labels{"client_id": strconv.Itoa(clientID)}, w.registerer)) + + // Add the client ID to logger so that we can easily distinguish Kafka clients in logs. + clientLogger := log.With(w.logger, "client_id", clientID) + + newClient, err := newKafkaWriterClient(w.kafkaCfg, w.maxInflightProduceRequests, clientLogger, clientReg) if err != nil { return nil, err } + + newWriter := NewKafkaProducer(newClient, w.kafkaCfg.ProducerMaxBufferedBytes, clientReg) w.writers[clientID] = newWriter return newWriter, nil } @@ -341,21 +277,3 @@ func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes return } - -func produceErrReason(err error) string { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, kgo.ErrRecordTimeout) { - return "timeout" - } - if errors.Is(err, kgo.ErrMaxBuffered) { - return "buffer-full" - } - if errors.Is(err, kerr.MessageTooLarge) { - return "record-too-large" - } - if errors.Is(err, context.Canceled) { - // This should never happen because we don't cancel produce requests, however we - // check this error anyway to detect if something unexpected happened. - return "canceled" - } - return "other" -} diff --git a/pkg/storage/ingest/writer_client.go b/pkg/storage/ingest/writer_client.go index edd1e346f59..ae638ff306f 100644 --- a/pkg/storage/ingest/writer_client.go +++ b/pkg/storage/ingest/writer_client.go @@ -3,35 +3,29 @@ package ingest import ( + "context" + "errors" "math" - "strconv" "sync" "time" "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/twmb/franz-go/pkg/kerr" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/plugin/kprom" + "go.uber.org/atomic" ) -type kafkaWriterClient struct { - *kgo.Client - - closeOnce *sync.Once - closed chan struct{} - - // Custom metrics. - bufferedProduceBytes prometheus.Summary - bufferedProduceBytesLimit prometheus.Gauge -} - -func newKafkaWriterClient(clientID int, kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kafkaWriterClient, error) { - logger = log.With(logger, "client_id", clientID) - reg = prometheus.WrapRegistererWith(prometheus.Labels{"client_id": strconv.Itoa(clientID)}, reg) - +// newKafkaWriterClient returns the kgo.Client that should be used by the Writer. +// +// The input prometheus.Registerer must be wrapped with a prefix (the names of metrics +// registered don't have a prefix). +func newKafkaWriterClient(kafkaCfg KafkaConfig, maxInflightProduceRequests int, logger log.Logger, reg prometheus.Registerer) (*kgo.Client, error) { // Do not export the client ID, because we use it to specify options to the backend. - metrics := kprom.NewMetrics("cortex_ingest_storage_writer", + metrics := kprom.NewMetrics( + "", // No prefix. We expect the input prometheus.Registered to be wrapped with a prefix. kprom.Registerer(reg), kprom.FetchAndProduceDetail(kprom.Batches, kprom.Records, kprom.CompressedBytes, kprom.UncompressedBytes)) @@ -84,19 +78,46 @@ func newKafkaWriterClient(clientID int, kafkaCfg KafkaConfig, maxInflightProduce kgo.MaxBufferedBytes(0), ) - kafkaClient, err := kgo.NewClient(opts...) - if err != nil { - return nil, err - } + return kgo.NewClient(opts...) +} - customClient := &kafkaWriterClient{ - Client: kafkaClient, - closeOnce: &sync.Once{}, - closed: make(chan struct{}), +// KafkaProducer is a kgo.Client wrapper exposing some higher level features and metrics useful for producers. +type KafkaProducer struct { + *kgo.Client + + closeOnce *sync.Once + closed chan struct{} + + // Keep track of Kafka records size (bytes) currently in-flight in the Kafka client. + // This counter is used to implement a limit on the max buffered bytes. + bufferedBytes *atomic.Int64 + + // The max buffered bytes allowed. Once this limit is reached, produce requests fail. + maxBufferedBytes int64 + // Custom metrics. + bufferedProduceBytes prometheus.Summary + bufferedProduceBytesLimit prometheus.Gauge + produceRequestsTotal prometheus.Counter + produceFailuresTotal *prometheus.CounterVec +} + +// NewKafkaProducer returns a new KafkaProducer. +// +// The input prometheus.Registerer must be wrapped with a prefix (the names of metrics +// registered don't have a prefix). +func NewKafkaProducer(client *kgo.Client, maxBufferedBytes int64, reg prometheus.Registerer) *KafkaProducer { + producer := &KafkaProducer{ + Client: client, + closeOnce: &sync.Once{}, + closed: make(chan struct{}), + bufferedBytes: atomic.NewInt64(0), + maxBufferedBytes: maxBufferedBytes, + + // Metrics. bufferedProduceBytes: promauto.With(reg).NewSummary( prometheus.SummaryOpts{ - Name: "cortex_ingest_storage_writer_buffered_produce_bytes", + Name: "buffered_produce_bytes", Help: "The buffered produce records in bytes. Quantile buckets keep track of buffered records size over the last 60s.", Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001, 1: 0.001}, MaxAge: time.Minute, @@ -104,19 +125,27 @@ func newKafkaWriterClient(clientID int, kafkaCfg KafkaConfig, maxInflightProduce }), bufferedProduceBytesLimit: promauto.With(reg).NewGauge( prometheus.GaugeOpts{ - Name: "cortex_ingest_storage_writer_buffered_produce_bytes_limit", + Name: "buffered_produce_bytes_limit", Help: "The bytes limit on buffered produce records. Produce requests fail once this limit is reached.", }), + produceRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "produce_requests_total", + Help: "Total number of produce requests issued to Kafka.", + }), + produceFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "produce_failures_total", + Help: "Total number of failed produce requests issued to Kafka.", + }, []string{"reason"}), } - customClient.bufferedProduceBytesLimit.Set(float64(kafkaCfg.ProducerMaxBufferedBytes)) + producer.bufferedProduceBytesLimit.Set(float64(maxBufferedBytes)) - go customClient.updateMetricsLoop() + go producer.updateMetricsLoop() - return customClient, nil + return producer } -func (c *kafkaWriterClient) Close() { +func (c *KafkaProducer) Close() { c.closeOnce.Do(func() { close(c.closed) }) @@ -124,7 +153,7 @@ func (c *kafkaWriterClient) Close() { c.Client.Close() } -func (c *kafkaWriterClient) updateMetricsLoop() { +func (c *KafkaProducer) updateMetricsLoop() { // We observe buffered produce bytes and at regular intervals, to have a good // approximation of the peak value reached over the observation period. ticker := time.NewTicker(250 * time.Millisecond) @@ -139,3 +168,85 @@ func (c *kafkaWriterClient) updateMetricsLoop() { } } } + +// ProduceSync produces records to Kafka and returns once all records have been successfully committed, +// or an error occurred. +// +// This function honors the configure max buffered bytes and refuse to produce a record, returnin kgo.ErrMaxBuffered, +// if the configured limit is reached. +func (c *KafkaProducer) ProduceSync(ctx context.Context, records []*kgo.Record) kgo.ProduceResults { + var ( + remaining = atomic.NewInt64(int64(len(records))) + done = make(chan struct{}) + resMx sync.Mutex + res = make(kgo.ProduceResults, 0, len(records)) + ) + + c.produceRequestsTotal.Add(float64(len(records))) + + onProduceDone := func(r *kgo.Record, err error) { + if c.maxBufferedBytes > 0 { + c.bufferedBytes.Add(-int64(len(r.Value))) + } + + resMx.Lock() + res = append(res, kgo.ProduceResult{Record: r, Err: err}) + resMx.Unlock() + + if err != nil { + c.produceFailuresTotal.WithLabelValues(produceErrReason(err)).Inc() + } + + // In case of error we'll wait for all responses anyway before returning from produceSync(). + // It allows us to keep code easier, given we don't expect this function to be frequently + // called with multiple records. + if remaining.Dec() == 0 { + close(done) + } + } + + for _, record := range records { + // Fast fail if the Kafka client buffer is full. Buffered bytes counter is decreased onProducerDone(). + if c.maxBufferedBytes > 0 && c.bufferedBytes.Add(int64(len(record.Value))) > c.maxBufferedBytes { + onProduceDone(record, kgo.ErrMaxBuffered) + continue + } + + // We use a new context to avoid that other Produce() may be cancelled when this call's context is + // canceled. It's important to note that cancelling the context passed to Produce() doesn't actually + // prevent the data to be sent over the wire (because it's never removed from the buffer) but in some + // cases may cause all requests to fail with context cancelled. + // + // Produce() may theoretically block if the buffer is full, but we configure the Kafka client with + // unlimited buffer because we implement the buffer limit ourselves (see maxBufferedBytes). This means + // Produce() should never block for us in practice. + c.Client.Produce(context.WithoutCancel(ctx), record, onProduceDone) + } + + // Wait for a response or until the context has done. + select { + case <-ctx.Done(): + return kgo.ProduceResults{{Err: context.Cause(ctx)}} + case <-done: + // Once we're done, it's guaranteed that no more results will be appended, so we can safely return it. + return res + } +} + +func produceErrReason(err error) string { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, kgo.ErrRecordTimeout) { + return "timeout" + } + if errors.Is(err, kgo.ErrMaxBuffered) { + return "buffer-full" + } + if errors.Is(err, kerr.MessageTooLarge) { + return "record-too-large" + } + if errors.Is(err, context.Canceled) { + // This should never happen because we don't cancel produce requests, however we + // check this error anyway to detect if something unexpected happened. + return "canceled" + } + return "other" +} diff --git a/pkg/storage/ingest/writer_client_test.go b/pkg/storage/ingest/writer_client_test.go index 1b411660bb8..338d6a6efab 100644 --- a/pkg/storage/ingest/writer_client_test.go +++ b/pkg/storage/ingest/writer_client_test.go @@ -21,7 +21,7 @@ import ( "github.com/grafana/mimir/pkg/util/testkafka" ) -func TestKafkaWriterClient_ShouldExposeBufferedBytesLimit(t *testing.T) { +func TestKafkaProducer_ShouldExposeBufferedBytesLimit(t *testing.T) { const ( numPartitions = 1 topicName = "test" @@ -32,18 +32,22 @@ func TestKafkaWriterClient_ShouldExposeBufferedBytesLimit(t *testing.T) { cfg.ProducerMaxBufferedBytes = 1024 * 1024 reg := prometheus.NewPedanticRegistry() - client, err := newKafkaWriterClient(1, cfg, 1, log.NewNopLogger(), reg) + prefixedReg := prometheus.WrapRegistererWithPrefix(writerMetricsPrefix, reg) + + client, err := newKafkaWriterClient(cfg, 1, log.NewNopLogger(), prefixedReg) require.NoError(t, err) - t.Cleanup(client.Close) + + producer := NewKafkaProducer(client, cfg.ProducerMaxBufferedBytes, prefixedReg) + t.Cleanup(producer.Close) assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_writer_buffered_produce_bytes_limit The bytes limit on buffered produce records. Produce requests fail once this limit is reached. # TYPE cortex_ingest_storage_writer_buffered_produce_bytes_limit gauge - cortex_ingest_storage_writer_buffered_produce_bytes_limit{client_id="1"} 1.048576e+06 + cortex_ingest_storage_writer_buffered_produce_bytes_limit 1.048576e+06 `), "cortex_ingest_storage_writer_buffered_produce_bytes_limit")) } -func TestKafkaWriterClient_ShouldTrackBufferedProduceBytes(t *testing.T) { +func TestKafkaProducer_ProduceSync_ShouldTrackBufferedProduceBytes(t *testing.T) { const ( numPartitions = 1 topicName = "test" @@ -61,9 +65,13 @@ func TestKafkaWriterClient_ShouldTrackBufferedProduceBytes(t *testing.T) { ctx := context.Background() cfg := createTestKafkaConfig(clusterAddr, topicName) reg := prometheus.NewPedanticRegistry() - client, err := newKafkaWriterClient(1, cfg, 1, log.NewNopLogger(), reg) + prefixedReg := prometheus.WrapRegistererWithPrefix(writerMetricsPrefix, reg) + + client, err := newKafkaWriterClient(cfg, 1, log.NewNopLogger(), prefixedReg) require.NoError(t, err) - t.Cleanup(client.Close) + + producer := NewKafkaProducer(client, cfg.ProducerMaxBufferedBytes, prefixedReg) + t.Cleanup(producer.Close) wg := sync.WaitGroup{} @@ -106,7 +114,7 @@ func TestKafkaWriterClient_ShouldTrackBufferedProduceBytes(t *testing.T) { wg.Wait() } -func getSummaryQuantileValue(t require.TestingT, reg *prometheus.Registry, metricName string, quantile float64) float64 { +func getSummaryQuantileValue(t require.TestingT, reg prometheus.Gatherer, metricName string, quantile float64) float64 { const delta = 0.0001 metrics, err := reg.Gather() diff --git a/pkg/storage/ingest/writer_test.go b/pkg/storage/ingest/writer_test.go index 192ac3a1b02..784200a6dc8 100644 --- a/pkg/storage/ingest/writer_test.go +++ b/pkg/storage/ingest/writer_test.go @@ -114,7 +114,7 @@ func TestWriter_WriteSync(t *testing.T) { # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_requests_total counter - cortex_ingest_storage_writer_produce_requests_total 1 + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 1 `, len(fetches.Records()[0].Value))), "cortex_ingest_storage_writer_sent_bytes_total", "cortex_ingest_storage_writer_records_per_write_request", @@ -204,7 +204,7 @@ func TestWriter_WriteSync(t *testing.T) { # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_requests_total counter - cortex_ingest_storage_writer_produce_requests_total 2 + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 2 `, expectedBytes)), "cortex_ingest_storage_writer_sent_bytes_total", "cortex_ingest_storage_writer_records_per_write_request", @@ -228,7 +228,7 @@ func TestWriter_WriteSync(t *testing.T) { _, clusterAddr := testkafka.CreateCluster(t, numPartitions, topicName) config := createTestKafkaConfig(clusterAddr, topicName) config.WriteClients = writeClients - writer, _ := createTestWriter(t, config) + writer, reg := createTestWriter(t, config) // Write to partitions. for partitionID, series := range seriesPerPartition { @@ -259,6 +259,24 @@ func TestWriter_WriteSync(t *testing.T) { assert.Equal(t, expected.Samples, received.Timeseries[idx].Samples) } } + + // Check metrics. The actual metrics depends on how many clients we have, so we're just checking it for + // the case of 1 and 2 clients. + if writeClients == 1 { + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. + # TYPE cortex_ingest_storage_writer_produce_requests_total counter + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 2 + `), "cortex_ingest_storage_writer_produce_requests_total")) + } + if writeClients == 2 { + assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` + # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. + # TYPE cortex_ingest_storage_writer_produce_requests_total counter + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 1 + cortex_ingest_storage_writer_produce_requests_total{client_id="1"} 1 + `), "cortex_ingest_storage_writer_produce_requests_total")) + } }) } }) @@ -415,11 +433,11 @@ func TestWriter_WriteSync(t *testing.T) { assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_requests_total counter - cortex_ingest_storage_writer_produce_requests_total 1 + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 1 # HELP cortex_ingest_storage_writer_produce_failures_total Total number of failed produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_failures_total counter - cortex_ingest_storage_writer_produce_failures_total{reason="other"} 1 + cortex_ingest_storage_writer_produce_failures_total{client_id="0",reason="other"} 1 `), "cortex_ingest_storage_writer_produce_requests_total", "cortex_ingest_storage_writer_produce_failures_total")) @@ -449,11 +467,11 @@ func TestWriter_WriteSync(t *testing.T) { assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_requests_total counter - cortex_ingest_storage_writer_produce_requests_total 1 + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 1 # HELP cortex_ingest_storage_writer_produce_failures_total Total number of failed produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_failures_total counter - cortex_ingest_storage_writer_produce_failures_total{reason="timeout"} 1 + cortex_ingest_storage_writer_produce_failures_total{client_id="0",reason="timeout"} 1 `), "cortex_ingest_storage_writer_produce_requests_total", "cortex_ingest_storage_writer_produce_failures_total")) @@ -599,11 +617,11 @@ func TestWriter_WriteSync(t *testing.T) { # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_requests_total counter - cortex_ingest_storage_writer_produce_requests_total 2 + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 2 # HELP cortex_ingest_storage_writer_produce_failures_total Total number of failed produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_failures_total counter - cortex_ingest_storage_writer_produce_failures_total{reason="record-too-large"} 1 + cortex_ingest_storage_writer_produce_failures_total{client_id="0",reason="record-too-large"} 1 `, len(fetches.Records()[0].Value))), "cortex_ingest_storage_writer_sent_bytes_total", "cortex_ingest_storage_writer_records_per_write_request", @@ -638,7 +656,7 @@ func TestWriter_WriteSync(t *testing.T) { cluster, clusterAddr := testkafka.CreateCluster(t, numPartitions, topicName) cfg := createTestKafkaConfig(clusterAddr, topicName) - cfg.ProducerMaxBufferedBytes = (estimatedRecordSize * 4) - 1 // Configure the test so that we expect 3 produced records. + cfg.ProducerMaxBufferedBytes = int64((estimatedRecordSize * 4) - 1) // Configure the test so that we expect 3 produced records. cfg.WriteTimeout = time.Second // Pre-condition checks. @@ -718,12 +736,12 @@ func TestWriter_WriteSync(t *testing.T) { assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_requests_total counter - cortex_ingest_storage_writer_produce_requests_total 10 + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 10 # HELP cortex_ingest_storage_writer_produce_failures_total Total number of failed produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_failures_total counter - cortex_ingest_storage_writer_produce_failures_total{reason="buffer-full"} 7 - cortex_ingest_storage_writer_produce_failures_total{reason="timeout"} 3 + cortex_ingest_storage_writer_produce_failures_total{client_id="0",reason="buffer-full"} 7 + cortex_ingest_storage_writer_produce_failures_total{client_id="0",reason="timeout"} 3 `), "cortex_ingest_storage_writer_produce_requests_total", "cortex_ingest_storage_writer_produce_failures_total")) @@ -748,12 +766,12 @@ func TestWriter_WriteSync(t *testing.T) { assert.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(` # HELP cortex_ingest_storage_writer_produce_requests_total Total number of produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_requests_total counter - cortex_ingest_storage_writer_produce_requests_total 13 + cortex_ingest_storage_writer_produce_requests_total{client_id="0"} 13 # HELP cortex_ingest_storage_writer_produce_failures_total Total number of failed produce requests issued to Kafka. # TYPE cortex_ingest_storage_writer_produce_failures_total counter - cortex_ingest_storage_writer_produce_failures_total{reason="buffer-full"} 7 - cortex_ingest_storage_writer_produce_failures_total{reason="timeout"} 3 + cortex_ingest_storage_writer_produce_failures_total{client_id="0",reason="buffer-full"} 7 + cortex_ingest_storage_writer_produce_failures_total{client_id="0",reason="timeout"} 3 `), "cortex_ingest_storage_writer_produce_requests_total", "cortex_ingest_storage_writer_produce_failures_total")) @@ -845,7 +863,9 @@ func TestWriter_WriteSync_HighConcurrencyOnKafkaClientBufferFull(t *testing.T) { require.NotZero(t, writeFailureCount.Load()) // We expect the buffered bytes to get down to 0 once all write requests completed. - require.Zero(t, writer.writersBufferedBytes.Load()) + producer, err := writer.getKafkaWriterForPartition(partitionID) + require.NoError(t, err) + require.Zero(t, producer.bufferedBytes.Load()) } func TestMarshalWriteRequestToRecords(t *testing.T) {