Skip to content

Commit

Permalink
Refactoring: extract ProduceSync() into a new KafkaProducer component (
Browse files Browse the repository at this point in the history
…#8750)

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Jul 20, 2024
1 parent 8a99254 commit e243636
Show file tree
Hide file tree
Showing 10 changed files with 226 additions and 169 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* [CHANGE] Query-frontend: Remove deprecated `frontend.align_queries_with_step` YAML configuration. The configuration option has been moved to per-tenant and default `limits` since Mimir 2.12. #8733 #8735
* [CHANGE] Store-gateway: Change default of `-blocks-storage.bucket-store.max-concurrent` to 200. #8768
* [FEATURE] Querier: add experimental streaming PromQL engine, enabled with `-querier.query-engine=mimir`. #8422 #8430 #8454 #8455 #8360 #8490 #8508 #8577 #8671
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738
* [FEATURE] Experimental Kafka-based ingest storage. #6888 #6894 #6929 #6940 #6951 #6974 #6982 #7029 #7030 #7091 #7142 #7147 #7148 #7153 #7160 #7193 #7349 #7376 #7388 #7391 #7393 #7394 #7402 #7404 #7423 #7424 #7437 #7486 #7503 #7508 #7540 #7621 #7682 #7685 #7694 #7695 #7696 #7697 #7701 #7733 #7734 #7741 #7752 #7838 #7851 #7871 #7877 #7880 #7882 #7887 #7891 #7925 #7955 #7967 #8031 #8063 #8077 #8088 #8135 #8176 #8184 #8194 #8216 #8217 #8222 #8233 #8503 #8542 #8579 #8657 #8686 #8688 #8703 #8706 #8708 #8738 #8750
* What it is:
* When the new ingest storage architecture is enabled, distributors write incoming write requests to a Kafka-compatible backend, and the ingesters asynchronously replay ingested data from Kafka. In this architecture, the write and read path are de-coupled through a Kafka-compatible backend. The write path and Kafka load is a function of the incoming write traffic, the read path load is a function of received queries. Whatever the load on the read path, it doesn't affect the write path.
* New configuration options:
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -6643,7 +6643,7 @@
"kind": "field",
"name": "producer_max_buffered_bytes",
"required": false,
"desc": "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit.",
"desc": "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.",
"fieldValue": null,
"fieldDefaultValue": 1073741824,
"fieldFlag": "ingest-storage.kafka.producer-max-buffered-bytes",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1342,7 +1342,7 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit. (default 1073741824)
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ Usage of ./cmd/mimir/mimir:
-ingest-storage.kafka.max-consumer-lag-at-startup duration
The guaranteed maximum lag before a consumer is considered to have caught up reading from a partition at startup, becomes ACTIVE in the hash ring and passes the readiness check. Set both -ingest-storage.kafka.target-consumer-lag-at-startup and -ingest-storage.kafka.max-consumer-lag-at-startup to 0 to disable waiting for maximum consumer lag being honored at startup. (default 15s)
-ingest-storage.kafka.producer-max-buffered-bytes int
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit. (default 1073741824)
The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit. (default 1073741824)
-ingest-storage.kafka.producer-max-record-size-bytes int
The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes. (default 15983616)
-ingest-storage.kafka.target-consumer-lag-at-startup duration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3827,7 +3827,7 @@ kafka:
# The maximum size of (uncompressed) buffered and unacknowledged produced
# records sent to Kafka. The produce request fails once this limit is reached.
# 0 to disable the limit.
# This limit is per Kafka client. 0 to disable the limit.
# CLI flag: -ingest-storage.kafka.producer-max-buffered-bytes
[producer_max_buffered_bytes: <int> | default = 1073741824]
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ type KafkaConfig struct {
AutoCreateTopicEnabled bool `yaml:"auto_create_topic_enabled"`
AutoCreateTopicDefaultPartitions int `yaml:"auto_create_topic_default_partitions"`

ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
ProducerMaxBufferedBytes int `yaml:"producer_max_buffered_bytes"`
ProducerMaxRecordSizeBytes int `yaml:"producer_max_record_size_bytes"`
ProducerMaxBufferedBytes int64 `yaml:"producer_max_buffered_bytes"`

WaitStrongReadConsistencyTimeout time.Duration `yaml:"wait_strong_read_consistency_timeout"`

Expand Down Expand Up @@ -123,7 +123,7 @@ func (cfg *KafkaConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet)
f.IntVar(&cfg.AutoCreateTopicDefaultPartitions, prefix+".auto-create-topic-default-partitions", 0, "When auto-creation of Kafka topic is enabled and this value is positive, Kafka's num.partitions configuration option is set on Kafka brokers with this value when Mimir component that uses Kafka starts. This configuration option specifies the default number of partitions that the Kafka broker uses for auto-created topics. Note that this is a Kafka-cluster wide setting, and applies to any auto-created topic. If the setting of num.partitions fails, Mimir proceeds anyways, but auto-created topics could have an incorrect number of partitions.")

f.IntVar(&cfg.ProducerMaxRecordSizeBytes, prefix+".producer-max-record-size-bytes", maxProducerRecordDataBytesLimit, "The maximum size of a Kafka record data that should be generated by the producer. An incoming write request larger than this size is split into multiple Kafka records. We strongly recommend to not change this setting unless for testing purposes.")
f.IntVar(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. 0 to disable the limit.")
f.Int64Var(&cfg.ProducerMaxBufferedBytes, prefix+".producer-max-buffered-bytes", 1024*1024*1024, "The maximum size of (uncompressed) buffered and unacknowledged produced records sent to Kafka. The produce request fails once this limit is reached. This limit is per Kafka client. 0 to disable the limit.")

f.DurationVar(&cfg.WaitStrongReadConsistencyTimeout, prefix+".wait-strong-read-consistency-timeout", 20*time.Second, "The maximum allowed for a read requests processed by an ingester to wait until strong read consistency is enforced. 0 to disable the timeout.")
}
Expand Down
126 changes: 22 additions & 104 deletions pkg/storage/ingest/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ingest
import (
"context"
"fmt"
"strconv"
"sync"
"time"

Expand All @@ -15,7 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util/globalerror"
Expand All @@ -37,6 +37,8 @@ const (
// in the worst case scenario, which is expected to be way above the actual one.
maxProducerRecordDataBytesLimit = producerBatchMaxBytes - 16384
minProducerRecordDataBytesLimit = 1024 * 1024

writerMetricsPrefix = "cortex_ingest_storage_writer_"
)

var (
Expand All @@ -59,18 +61,12 @@ type Writer struct {
// We support multiple Kafka clients to better parallelize the workload. The number of
// clients is fixed during the Writer lifecycle, but they're initialised lazily.
writersMx sync.RWMutex
writers []*kafkaWriterClient

// Keep track of Kafka records size (bytes) currently in-flight in the Kafka client.
// This counter is used to implement a limit on the max buffered bytes.
writersBufferedBytes *atomic.Int64
writers []*KafkaProducer

// Metrics.
writeRequestsTotal prometheus.Counter
writeFailuresTotal *prometheus.CounterVec
writeLatency prometheus.Histogram
writeBytesTotal prometheus.Counter
recordsPerRequest prometheus.Histogram
writeLatency prometheus.Histogram
writeBytesTotal prometheus.Counter
recordsPerRequest prometheus.Histogram

// The following settings can only be overridden in tests.
maxInflightProduceRequests int
Expand All @@ -81,19 +77,10 @@ func NewWriter(kafkaCfg KafkaConfig, logger log.Logger, reg prometheus.Registere
kafkaCfg: kafkaCfg,
logger: logger,
registerer: reg,
writers: make([]*kafkaWriterClient, kafkaCfg.WriteClients),
writers: make([]*KafkaProducer, kafkaCfg.WriteClients),
maxInflightProduceRequests: 20,
writersBufferedBytes: atomic.NewInt64(0),

// Metrics.
writeRequestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingest_storage_writer_produce_requests_total",
Help: "Total number of produce requests issued to Kafka.",
}),
writeFailuresTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingest_storage_writer_produce_failures_total",
Help: "Total number of failed produce requests issued to Kafka.",
}, []string{"reason"}),
writeLatency: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingest_storage_writer_latency_seconds",
Help: "Latency to write an incoming request to the ingest storage.",
Expand Down Expand Up @@ -168,7 +155,7 @@ func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string
// visibility over this metric if records are rejected by Kafka because of MESSAGE_TOO_LARGE).
w.recordsPerRequest.Observe(float64(len(records)))

res := w.produceSync(ctx, writer, records)
res := writer.ProduceSync(ctx, records)

// Track latency only for successfully written records.
if count, sizeBytes := successfulProduceRecordsStats(res); count > 0 {
Expand All @@ -187,69 +174,7 @@ func (w *Writer) WriteSync(ctx context.Context, partitionID int32, userID string
return nil
}

// produceSync produces records to Kafka and returns once all records have been successfully committed,
// or an error occurred.
func (w *Writer) produceSync(ctx context.Context, client *kafkaWriterClient, records []*kgo.Record) kgo.ProduceResults {
var (
remaining = atomic.NewInt64(int64(len(records)))
done = make(chan struct{})
resMx sync.Mutex
res = make(kgo.ProduceResults, 0, len(records))
maxBufferedBytes = int64(w.kafkaCfg.ProducerMaxBufferedBytes)
)

w.writeRequestsTotal.Add(float64(len(records)))

onProduceDone := func(r *kgo.Record, err error) {
if maxBufferedBytes > 0 {
w.writersBufferedBytes.Add(-int64(len(r.Value)))
}

resMx.Lock()
res = append(res, kgo.ProduceResult{Record: r, Err: err})
resMx.Unlock()

if err != nil {
w.writeFailuresTotal.WithLabelValues(produceErrReason(err)).Inc()
}

// In case of error we'll wait for all responses anyway before returning from produceSync().
// It allows us to keep code easier, given we don't expect this function to be frequently
// called with multiple records.
if remaining.Dec() == 0 {
close(done)
}
}

for _, record := range records {
// Fast fail if the Kafka client buffer is full. Buffered bytes counter is decreased onProducerDone().
if maxBufferedBytes > 0 && w.writersBufferedBytes.Add(int64(len(record.Value))) > maxBufferedBytes {
onProduceDone(record, kgo.ErrMaxBuffered)
continue
}

// We use a new context to avoid that other Produce() may be cancelled when this call's context is
// canceled. It's important to note that cancelling the context passed to Produce() doesn't actually
// prevent the data to be sent over the wire (because it's never removed from the buffer) but in some
// cases may cause all requests to fail with context cancelled.
//
// Produce() may theoretically block if the buffer is full, but we configure the Kafka client with
// unlimited buffer because we implement the buffer limit ourselves (see maxBufferedBytes). This means
// Produce() should never block for us in practice.
client.Produce(context.WithoutCancel(ctx), record, onProduceDone)
}

// Wait for a response or until the context has done.
select {
case <-ctx.Done():
return kgo.ProduceResults{{Err: context.Cause(ctx)}}
case <-done:
// Once we're done, it's guaranteed that no more results will be appended, so we can safely return it.
return res
}
}

func (w *Writer) getKafkaWriterForPartition(partitionID int32) (*kafkaWriterClient, error) {
func (w *Writer) getKafkaWriterForPartition(partitionID int32) (*KafkaProducer, error) {
// Check if the writer has already been created.
w.writersMx.RLock()
clientID := int(partitionID) % len(w.writers)
Expand All @@ -268,10 +193,21 @@ func (w *Writer) getKafkaWriterForPartition(partitionID int32) (*kafkaWriterClie
if writer != nil {
return writer, nil
}
newWriter, err := newKafkaWriterClient(clientID, w.kafkaCfg, w.maxInflightProduceRequests, w.logger, w.registerer)

// Add the client ID to metrics so that they don't clash when the Writer is configured
// to run with multiple Kafka clients.
clientReg := prometheus.WrapRegistererWithPrefix(writerMetricsPrefix,
prometheus.WrapRegistererWith(prometheus.Labels{"client_id": strconv.Itoa(clientID)}, w.registerer))

// Add the client ID to logger so that we can easily distinguish Kafka clients in logs.
clientLogger := log.With(w.logger, "client_id", clientID)

newClient, err := newKafkaWriterClient(w.kafkaCfg, w.maxInflightProduceRequests, clientLogger, clientReg)
if err != nil {
return nil, err
}

newWriter := NewKafkaProducer(newClient, w.kafkaCfg.ProducerMaxBufferedBytes, clientReg)
w.writers[clientID] = newWriter
return newWriter, nil
}
Expand Down Expand Up @@ -341,21 +277,3 @@ func successfulProduceRecordsStats(results kgo.ProduceResults) (count, sizeBytes

return
}

func produceErrReason(err error) string {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, kgo.ErrRecordTimeout) {
return "timeout"
}
if errors.Is(err, kgo.ErrMaxBuffered) {
return "buffer-full"
}
if errors.Is(err, kerr.MessageTooLarge) {
return "record-too-large"
}
if errors.Is(err, context.Canceled) {
// This should never happen because we don't cancel produce requests, however we
// check this error anyway to detect if something unexpected happened.
return "canceled"
}
return "other"
}
Loading

0 comments on commit e243636

Please sign in to comment.