Skip to content

Commit

Permalink
kgo: consistently use \d.\d rather than \d.\d.0 for kafka versions
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Oct 20, 2022
1 parent b18341d commit f8038de
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ start:
// If we used a version larger than Kafka supports, Kafka replies with
// Version 0 and an UNSUPPORTED_VERSION error.
//
// Pre Kafka 2.4.0, we have to retry the request with version 0.
// Pre Kafka 2.4, we have to retry the request with version 0.
// Post, Kafka replies with all versions.
if rawResp[1] == 35 {
if maxVersion == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package kgo provides a pure Go efficient Kafka client for Kafka 0.8.0+ with
// Package kgo provides a pure Go efficient Kafka client for Kafka 0.8+ with
// support for transactions, regex topic consuming, the latest partition
// strategies, and more. This client supports all client related KIPs.
//
Expand Down
30 changes: 15 additions & 15 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func ClientID(id string) Opt {
}

// SoftwareNameAndVersion sets the client software name and version that will
// be sent to Kafka as part of the ApiVersions request as of Kafka 2.4.0,
// be sent to Kafka as part of the ApiVersions request as of Kafka 2.4,
// overriding the default "kgo" and internal version number.
//
// Kafka exposes this through metrics to help operators understand the impact
Expand Down Expand Up @@ -879,12 +879,12 @@ func MaxProduceRequestsInflightPerBroker(n int) ProducerOpt {
// records.
//
// Compression is chosen in the order preferred based on broker support. For
// example, zstd compression was introduced in Kafka 2.1.0, so the preference
// example, zstd compression was introduced in Kafka 2.1, so the preference
// can be first zstd, fallback snappy, fallback none.
//
// The default preference is [snappy, none], which should be fine for all old
// consumers since snappy compression has existed since Kafka 0.8.0. To use
// zstd, your brokers must be at least 2.1.0 and all consumers must be upgraded
// zstd, your brokers must be at least 2.1 and all consumers must be upgraded
// to support decoding zstd records.
func ProducerBatchCompression(preference ...CompressionCodec) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.compression = preference }}
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt {
// After producing a batch, you must commit what you consumed. Auto committing
// offsets is disabled during transactional consuming / producing.
//
// Note that unless using Kafka 2.5.0, a consumer group rebalance may be
// Note that unless using Kafka 2.5, a consumer group rebalance may be
// problematic. Production should finish and be committed before the client
// rejoins the group. It may be safer to use an eager group balancer and just
// abort the transaction. Alternatively, any time a partition is revoked, you
Expand All @@ -1081,8 +1081,8 @@ func TransactionalID(id string) ProducerOpt {
// default 40s. It is a good idea to keep this less than a group's session
// timeout, so that a group member will always be alive for the duration of a
// transaction even if connectivity dies. This helps prevent a transaction
// finishing after a rebalance, which is problematic pre-Kafka 2.5.0. If you
// are on Kafka 2.5.0+, then you can use the RequireStableFetchOffsets option
// finishing after a rebalance, which is problematic pre-Kafka 2.5. If you
// are on Kafka 2.5+, then you can use the RequireStableFetchOffsets option
// when assigning the group, and you can set this to whatever you would like.
//
// Transaction timeouts begin when the first record is produced within a
Expand Down Expand Up @@ -1345,10 +1345,10 @@ func Balancers(balancers ...GroupBalancer) GroupOpt {
// initiate a rebalance.
//
// If you are using a GroupTransactSession for EOS, wish to lower this, and are
// talking to a Kafka cluster pre 2.5.0, consider lowering the
// talking to a Kafka cluster pre 2.5, consider lowering the
// TransactionTimeout. If you do not, you risk a transaction finishing after a
// group has rebalanced, which could lead to duplicate processing. If you are
// talking to a Kafka 2.5.0+ cluster, you can safely use the
// talking to a Kafka 2.5+ cluster, you can safely use the
// RequireStableFetchOffsets group option and prevent any problems.
//
// This option corresponds to Kafka's session.timeout.ms setting and must be
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func HeartbeatInterval(interval time.Duration) GroupOpt {

// RequireStableFetchOffsets sets the group consumer to require "stable" fetch
// offsets before consuming from the group. Proposed in KIP-447 and introduced
// in Kafka 2.5.0, stable offsets are important when consuming from partitions
// in Kafka 2.5, stable offsets are important when consuming from partitions
// that a transactional producer could be committing to.
//
// With this option, Kafka will block group consumers from fetching offsets for
Expand Down Expand Up @@ -1597,15 +1597,15 @@ func AutoCommitMarks() GroupOpt {
// InstanceID sets the group consumer's instance ID, switching the group member
// from "dynamic" to "static".
//
// Prior to Kafka 2.3.0, joining a group gave a group member a new member ID.
// Prior to Kafka 2.3, joining a group gave a group member a new member ID.
// The group leader could not tell if this was a rejoining member. Thus, any
// join caused the group to rebalance.
//
// Kafka 2.3.0 introduced the concept of an instance ID, which can persist
// across restarts. This allows for avoiding many costly rebalances and allows
// for stickier rebalancing for rejoining members (since the ID for balancing
// stays the same). The main downsides are that you, the user of a client, have
// to manage instance IDs properly, and that it may take longer to rebalance in
// Kafka 2.3 introduced the concept of an instance ID, which can persist across
// restarts. This allows for avoiding many costly rebalances and allows for
// stickier rebalancing for rejoining members (since the ID for balancing stays
// the same). The main downsides are that you, the user of a client, have to
// manage instance IDs properly, and that it may take longer to rebalance in
// the event that a client legitimately dies.
//
// When using an instance ID, the client does NOT send a leave group request
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (*ErrFirstReadEOF) Error() string {
// Unwrap returns io.EOF.
func (*ErrFirstReadEOF) Unwrap() error { return io.EOF }

// ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the
// ErrDataLoss is returned for Kafka >=2.1 when data loss is detected and the
// client is able to reset to the last valid offset.
type ErrDataLoss struct {
// Topic is the topic data loss was detected on.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type HookBrokerThrottle interface {
// throttling interval, and whether the throttle was applied before
// Kafka responded to them request or after.
//
// For Kafka < 2.0.0, the throttle is applied before issuing a response.
// For Kafka >= 2.0.0, the throttle is applied after issuing a response.
// For Kafka < 2.0, the throttle is applied before issuing a response.
// For Kafka >= 2.0, the throttle is applied after issuing a response.
//
// If throttledAfterResponse is false, then Kafka already applied the
// throttle. If it is true, the client internally will not send another
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,10 @@ func (cl *Client) producerID() (int64, int16, error) {
// for every partition. Otherwise, we will use a new id/epoch for a partition
// and trigger OOOSN errors.
//
// Pre 2.5.0, this function is only be called if it is acceptable to continue
// Pre 2.5, this function is only be called if it is acceptable to continue
// on data loss (idempotent producer with no StopOnDataLoss option).
//
// 2.5.0+, it is safe to call this if the producer ID can be reset (KIP-360),
// 2.5+, it is safe to call this if the producer ID can be reset (KIP-360),
// in EndTransaction.
func (cl *Client) resetAllProducerSequences() {
for _, tp := range cl.producer.topics.load() {
Expand Down
16 changes: 8 additions & 8 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type sink struct {
nodeID int32 // the node ID of the broker this sink belongs to

// inflightSem controls the number of concurrent produce requests. We
// start with a limit of 1, which covers Kafka v0.11.0.0. On the first
// start with a limit of 1, which covers Kafka v0.11.0. On the first
// response, we check what version was set in the request. If it is at
// least 4, which 1.0.0 introduced, we upgrade the sem size.
// least 4, which 1.0 introduced, we upgrade the sem size.
inflightSem atomic.Value
produceVersion int32 // atomic, negative is unset, positive is version

Expand Down Expand Up @@ -726,7 +726,7 @@ func (s *sink) handleReqRespBatch(
err == kerr.InvalidProducerIDMapping,
err == kerr.InvalidProducerEpoch:

// OOOSN always means data loss 1.0.0+ and is ambiguous prior.
// OOOSN always means data loss 1.0+ and is ambiguous prior.
// We assume the worst and only continue if requested.
//
// UnknownProducerID was introduced to allow some form of safe
Expand All @@ -736,9 +736,9 @@ func (s *sink) handleReqRespBatch(
// InvalidMapping is similar to UnknownProducerID, but occurs
// when the txnal coordinator timed out our transaction.
//
// 2.5.0
// 2.5
// =====
// 2.5.0 introduced some behavior to potentially safely reset
// 2.5 introduced some behavior to potentially safely reset
// the sequence numbers by bumping an epoch (see KIP-360).
//
// For the idempotent producer, the solution is to fail all
Expand All @@ -752,9 +752,9 @@ func (s *sink) handleReqRespBatch(
// For the transactional producer, we always fail the producerID.
// EndTransaction will trigger recovery if possible.
//
// 2.7.0
// 2.7
// =====
// InvalidProducerEpoch became retriable in 2.7.0. Prior, it
// InvalidProducerEpoch became retriable in 2.7. Prior, it
// was ambiguous (timeout? fenced?). Now, InvalidProducerEpoch
// is only returned on produce, and then we can recover on other
// txn coordinator requests, which have PRODUCER_FENCED vs
Expand Down Expand Up @@ -2008,7 +2008,7 @@ func (b seqRecBatch) appendTo(
dst = kbin.AppendInt32(dst, batchLen)

dst = kbin.AppendInt32(dst, -1) // partitionLeaderEpoch, unused in clients
dst = kbin.AppendInt8(dst, 2) // magic, defined as 2 for records v0.11.0.0+
dst = kbin.AppendInt8(dst, 2) // magic, defined as 2 for records v0.11.0+

crcStart := len(dst) // fill at end
dst = kbin.AppendInt32(dst, 0) // reserved crc
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

case kerr.OffsetOutOfRange:
// If we are out of range, we reset to what we can.
// With Kafka >= 2.1.0, we should only get offset out
// With Kafka >= 2.1, we should only get offset out
// of range if we fetch before the start, but a user
// could start past the end and want to reset to
// the end. We respect that.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type GroupTransactSession struct {
// The problem with (a) is that if your ETL work loop is slow, you run the risk
// of exceeding the rebalance timeout and being kicked from the group. You will
// try to commit, and depending on the Kafka version, the commit may even be
// erroneously successful (pre Kafka 2.5.0). This will lead to duplicates.
// erroneously successful (pre Kafka 2.5). This will lead to duplicates.
//
// Instead, for safety, a GroupTransactSession favors (b). If a rebalance
// occurs at any time before ending a transaction with a commit, this will
Expand Down Expand Up @@ -753,7 +753,7 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
// not retry with TryAbort.
//
// If records failed with UnknownProducerID and your Kafka version is at least
// 2.5.0, then aborting here will potentially allow the client to recover for
// 2.5, then aborting here will potentially allow the client to recover for
// more production.
//
// Note that canceling the context will likely leave the client in an
Expand Down

0 comments on commit f8038de

Please sign in to comment.