Skip to content

Commit

Permalink
Merge pull request #229 from twmb/instance_id
Browse files Browse the repository at this point in the history
kgo: work around KIP-345 and KIP-814 limitations if possible
  • Loading branch information
twmb authored Oct 20, 2022
2 parents b2aec9c + f8038de commit a7f5d0d
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ start:
// If we used a version larger than Kafka supports, Kafka replies with
// Version 0 and an UNSUPPORTED_VERSION error.
//
// Pre Kafka 2.4.0, we have to retry the request with version 0.
// Pre Kafka 2.4, we have to retry the request with version 0.
// Post, Kafka replies with all versions.
if rawResp[1] == 35 {
if maxVersion == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/client.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package kgo provides a pure Go efficient Kafka client for Kafka 0.8.0+ with
// Package kgo provides a pure Go efficient Kafka client for Kafka 0.8+ with
// support for transactions, regex topic consuming, the latest partition
// strategies, and more. This client supports all client related KIPs.
//
Expand Down
37 changes: 21 additions & 16 deletions pkg/kgo/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ func ClientID(id string) Opt {
}

// SoftwareNameAndVersion sets the client software name and version that will
// be sent to Kafka as part of the ApiVersions request as of Kafka 2.4.0,
// be sent to Kafka as part of the ApiVersions request as of Kafka 2.4,
// overriding the default "kgo" and internal version number.
//
// Kafka exposes this through metrics to help operators understand the impact
Expand Down Expand Up @@ -879,12 +879,12 @@ func MaxProduceRequestsInflightPerBroker(n int) ProducerOpt {
// records.
//
// Compression is chosen in the order preferred based on broker support. For
// example, zstd compression was introduced in Kafka 2.1.0, so the preference
// example, zstd compression was introduced in Kafka 2.1, so the preference
// can be first zstd, fallback snappy, fallback none.
//
// The default preference is [snappy, none], which should be fine for all old
// consumers since snappy compression has existed since Kafka 0.8.0. To use
// zstd, your brokers must be at least 2.1.0 and all consumers must be upgraded
// zstd, your brokers must be at least 2.1 and all consumers must be upgraded
// to support decoding zstd records.
func ProducerBatchCompression(preference ...CompressionCodec) ProducerOpt {
return producerOpt{func(cfg *cfg) { cfg.compression = preference }}
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func RecordDeliveryTimeout(timeout time.Duration) ProducerOpt {
// After producing a batch, you must commit what you consumed. Auto committing
// offsets is disabled during transactional consuming / producing.
//
// Note that unless using Kafka 2.5.0, a consumer group rebalance may be
// Note that unless using Kafka 2.5, a consumer group rebalance may be
// problematic. Production should finish and be committed before the client
// rejoins the group. It may be safer to use an eager group balancer and just
// abort the transaction. Alternatively, any time a partition is revoked, you
Expand All @@ -1081,8 +1081,8 @@ func TransactionalID(id string) ProducerOpt {
// default 40s. It is a good idea to keep this less than a group's session
// timeout, so that a group member will always be alive for the duration of a
// transaction even if connectivity dies. This helps prevent a transaction
// finishing after a rebalance, which is problematic pre-Kafka 2.5.0. If you
// are on Kafka 2.5.0+, then you can use the RequireStableFetchOffsets option
// finishing after a rebalance, which is problematic pre-Kafka 2.5. If you
// are on Kafka 2.5+, then you can use the RequireStableFetchOffsets option
// when assigning the group, and you can set this to whatever you would like.
//
// Transaction timeouts begin when the first record is produced within a
Expand Down Expand Up @@ -1345,10 +1345,10 @@ func Balancers(balancers ...GroupBalancer) GroupOpt {
// initiate a rebalance.
//
// If you are using a GroupTransactSession for EOS, wish to lower this, and are
// talking to a Kafka cluster pre 2.5.0, consider lowering the
// talking to a Kafka cluster pre 2.5, consider lowering the
// TransactionTimeout. If you do not, you risk a transaction finishing after a
// group has rebalanced, which could lead to duplicate processing. If you are
// talking to a Kafka 2.5.0+ cluster, you can safely use the
// talking to a Kafka 2.5+ cluster, you can safely use the
// RequireStableFetchOffsets group option and prevent any problems.
//
// This option corresponds to Kafka's session.timeout.ms setting and must be
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func HeartbeatInterval(interval time.Duration) GroupOpt {

// RequireStableFetchOffsets sets the group consumer to require "stable" fetch
// offsets before consuming from the group. Proposed in KIP-447 and introduced
// in Kafka 2.5.0, stable offsets are important when consuming from partitions
// in Kafka 2.5, stable offsets are important when consuming from partitions
// that a transactional producer could be committing to.
//
// With this option, Kafka will block group consumers from fetching offsets for
Expand Down Expand Up @@ -1597,15 +1597,15 @@ func AutoCommitMarks() GroupOpt {
// InstanceID sets the group consumer's instance ID, switching the group member
// from "dynamic" to "static".
//
// Prior to Kafka 2.3.0, joining a group gave a group member a new member ID.
// Prior to Kafka 2.3, joining a group gave a group member a new member ID.
// The group leader could not tell if this was a rejoining member. Thus, any
// join caused the group to rebalance.
//
// Kafka 2.3.0 introduced the concept of an instance ID, which can persist
// across restarts. This allows for avoiding many costly rebalances and allows
// for stickier rebalancing for rejoining members (since the ID for balancing
// stays the same). The main downsides are that you, the user of a client, have
// to manage instance IDs properly, and that it may take longer to rebalance in
// Kafka 2.3 introduced the concept of an instance ID, which can persist across
// restarts. This allows for avoiding many costly rebalances and allows for
// stickier rebalancing for rejoining members (since the ID for balancing stays
// the same). The main downsides are that you, the user of a client, have to
// manage instance IDs properly, and that it may take longer to rebalance in
// the event that a client legitimately dies.
//
// When using an instance ID, the client does NOT send a leave group request
Expand All @@ -1618,7 +1618,12 @@ func AutoCommitMarks() GroupOpt {
// issues a leave group request on behalf of this instance ID (see kcl), or you
// can manually use the kmsg package with a proper LeaveGroupRequest.
//
// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4.0+.
// NOTE: Leaving a group with an instance ID is only supported in Kafka 2.4+.
//
// NOTE: If you restart a consumer group leader that is using an instance ID,
// it will not cause a rebalance even if you change which topics the leader is
// consuming. If your cluster is 3.2+, this client internally works around this
// limitation and you do not need to trigger a rebalance manually.
func InstanceID(id string) GroupOpt {
return groupOpt{func(cfg *cfg) { cfg.instanceID = &id }}
}
Expand Down
75 changes: 72 additions & 3 deletions pkg/kgo/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -1054,7 +1054,9 @@ start:
syncReq.InstanceID = g.cfg.instanceID
syncReq.ProtocolType = &g.cfg.protocol
syncReq.Protocol = &protocol
syncReq.GroupAssignment = plan // nil unless we are the leader
if !joinResp.SkipAssignment {
syncReq.GroupAssignment = plan // nil unless we are the leader
}
var (
syncResp *kmsg.SyncGroupResponse
synced = make(chan struct{})
Expand Down Expand Up @@ -1084,6 +1086,26 @@ start:
return err
}

// KIP-814 fixes one limitation with KIP-345, but has another
// fundamental limitation. When an instance ID leader restarts, its
// first join always gets its old assignment *even if* the member's
// topic interests have changed. The broker tells us to skip doing
// assignment ourselves, but we ignore that for our well known
// balancers. Instead, we balance (but avoid sending it while syncing,
// as we are supposed to), and if our sync assignment differs from our
// own calculated assignment, We know we have a stale broker assignment
// and must trigger a rebalance.
if plan != nil && joinResp.SkipAssignment {
for _, assign := range plan {
if assign.MemberID == g.memberID {
if !bytes.Equal(assign.MemberAssignment, syncResp.MemberAssignment) {
g.rejoin("instance group leader restarted and was reassigned old plan, our topic interests changed and we must rejoin to force a rebalance")
}
break
}
}
}

return nil
}

Expand Down Expand Up @@ -1117,30 +1139,77 @@ func (g *groupConsumer) handleJoinResp(resp *kmsg.JoinGroupResponse) (restart bo
protocol = *resp.Protocol
}

// KIP-345 has a fundamental limitation that KIP-814 also does not
// solve.
//
// When using instance IDs, if a leader restarts, its first join
// receives its old assignment no matter what. KIP-345 resulted in
// leaderless consumer groups, KIP-814 fixes this by notifying the
// restarted leader that it is still leader but that it should not
// balance.
//
// If the join response is <= v8, we hackily work around the leaderless
// situation by checking if the LeaderID is prefixed with our
// InstanceID. This is how Kafka and Redpanda are both implemented. At
// worst, if we mis-predict the leader, then we may accidentally try to
// cause a rebalance later and it will do nothing. That's fine. At
// least we can cause rebalances now, rather than having a leaderless,
// not-ever-rebalancing client.
//
// KIP-814 does not solve our problem fully: if we restart and rejoin,
// we always get our old assignment even if we changed what topics we
// were interested in. Because we have our old assignment, we think
// that the plan is fine *even with* our new interests, and we wait for
// some external rebalance trigger. We work around this limitation
// above (see "KIP-814") only for well known balancers; we cannot work
// around this limitation for not well known balancers because they may
// do so weird things we cannot control nor reason about.
leader := resp.LeaderID == resp.MemberID
leaderNoPlan := !leader && resp.Version <= 8 && g.cfg.instanceID != nil && strings.HasPrefix(resp.LeaderID, *g.cfg.instanceID+"-")
if leader {
g.leader.set(true)
g.cfg.logger.Log(LogLevelInfo, "joined, balancing group",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", g.cfg.instanceID,
"instance_id", strptr{g.cfg.instanceID},
"generation", g.generation,
"balance_protocol", protocol,
"leader", true,
)
plan, err = g.balanceGroup(protocol, resp.Members, resp.SkipAssignment)
} else if leaderNoPlan {
g.leader.set(true)
g.cfg.logger.Log(LogLevelInfo, "joined as leader but unable to balance group due to KIP-345 limitations",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", strptr{g.cfg.instanceID},
"generation", g.generation,
"balance_protocol", protocol,
"leader", true,
)
} else {
g.cfg.logger.Log(LogLevelInfo, "joined",
"group", g.cfg.group,
"member_id", g.memberID,
"instance_id", g.cfg.instanceID,
"instance_id", strptr{g.cfg.instanceID},
"generation", g.generation,
"leader", false,
)
}
return
}

type strptr struct {
s *string
}

func (s strptr) String() string {
if s.s == nil {
return "<nil>"
}
return *s.s
}

// If other group members consume topics we are not interested in, we track the
// entire group's topics in this groupExternal type. On metadata update, we see
// if any partitions for any of these topics have changed, and if so, we as
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (*ErrFirstReadEOF) Error() string {
// Unwrap returns io.EOF.
func (*ErrFirstReadEOF) Unwrap() error { return io.EOF }

// ErrDataLoss is returned for Kafka >=2.1.0 when data loss is detected and the
// ErrDataLoss is returned for Kafka >=2.1 when data loss is detected and the
// client is able to reset to the last valid offset.
type ErrDataLoss struct {
// Topic is the topic data loss was detected on.
Expand Down
15 changes: 9 additions & 6 deletions pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,7 @@ func (g *groupConsumer) findBalancer(from, proto string) (GroupBalancer, error)
// own metadata update to see if partition counts have changed for these random
// topics.
func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupResponseMember, skipBalance bool) ([]kmsg.SyncGroupRequestGroupAssignment, error) {
if skipBalance {
g.cl.cfg.logger.Log(LogLevelInfo, "parsing group balance as leader but not assigning (KIP-814)")
} else {
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")
}
g.cl.cfg.logger.Log(LogLevelInfo, "balancing group as leader")

b, err := g.findBalancer("balance group", proto)
if err != nil {
Expand Down Expand Up @@ -443,7 +439,14 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
// have logged the current interests, we do not need to actually
// balance.
if skipBalance {
return nil, nil
switch proto := b.ProtocolName(); proto {
case RangeBalancer().ProtocolName(),
RoundRobinBalancer().ProtocolName(),
StickyBalancer().ProtocolName(),
CooperativeStickyBalancer().ProtocolName():
default:
return nil, nil
}
}

// If the returned IntoSyncAssignment is a BalancePlan, which it likely
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type HookBrokerThrottle interface {
// throttling interval, and whether the throttle was applied before
// Kafka responded to them request or after.
//
// For Kafka < 2.0.0, the throttle is applied before issuing a response.
// For Kafka >= 2.0.0, the throttle is applied after issuing a response.
// For Kafka < 2.0, the throttle is applied before issuing a response.
// For Kafka >= 2.0, the throttle is applied after issuing a response.
//
// If throttledAfterResponse is false, then Kafka already applied the
// throttle. If it is true, the client internally will not send another
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,10 +659,10 @@ func (cl *Client) producerID() (int64, int16, error) {
// for every partition. Otherwise, we will use a new id/epoch for a partition
// and trigger OOOSN errors.
//
// Pre 2.5.0, this function is only be called if it is acceptable to continue
// Pre 2.5, this function is only be called if it is acceptable to continue
// on data loss (idempotent producer with no StopOnDataLoss option).
//
// 2.5.0+, it is safe to call this if the producer ID can be reset (KIP-360),
// 2.5+, it is safe to call this if the producer ID can be reset (KIP-360),
// in EndTransaction.
func (cl *Client) resetAllProducerSequences() {
for _, tp := range cl.producer.topics.load() {
Expand Down
16 changes: 8 additions & 8 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ type sink struct {
nodeID int32 // the node ID of the broker this sink belongs to

// inflightSem controls the number of concurrent produce requests. We
// start with a limit of 1, which covers Kafka v0.11.0.0. On the first
// start with a limit of 1, which covers Kafka v0.11.0. On the first
// response, we check what version was set in the request. If it is at
// least 4, which 1.0.0 introduced, we upgrade the sem size.
// least 4, which 1.0 introduced, we upgrade the sem size.
inflightSem atomic.Value
produceVersion int32 // atomic, negative is unset, positive is version

Expand Down Expand Up @@ -730,7 +730,7 @@ func (s *sink) handleReqRespBatch(
err == kerr.InvalidProducerIDMapping,
err == kerr.InvalidProducerEpoch:

// OOOSN always means data loss 1.0.0+ and is ambiguous prior.
// OOOSN always means data loss 1.0+ and is ambiguous prior.
// We assume the worst and only continue if requested.
//
// UnknownProducerID was introduced to allow some form of safe
Expand All @@ -740,9 +740,9 @@ func (s *sink) handleReqRespBatch(
// InvalidMapping is similar to UnknownProducerID, but occurs
// when the txnal coordinator timed out our transaction.
//
// 2.5.0
// 2.5
// =====
// 2.5.0 introduced some behavior to potentially safely reset
// 2.5 introduced some behavior to potentially safely reset
// the sequence numbers by bumping an epoch (see KIP-360).
//
// For the idempotent producer, the solution is to fail all
Expand All @@ -756,9 +756,9 @@ func (s *sink) handleReqRespBatch(
// For the transactional producer, we always fail the producerID.
// EndTransaction will trigger recovery if possible.
//
// 2.7.0
// 2.7
// =====
// InvalidProducerEpoch became retriable in 2.7.0. Prior, it
// InvalidProducerEpoch became retriable in 2.7. Prior, it
// was ambiguous (timeout? fenced?). Now, InvalidProducerEpoch
// is only returned on produce, and then we can recover on other
// txn coordinator requests, which have PRODUCER_FENCED vs
Expand Down Expand Up @@ -2012,7 +2012,7 @@ func (b seqRecBatch) appendTo(
dst = kbin.AppendInt32(dst, batchLen)

dst = kbin.AppendInt32(dst, -1) // partitionLeaderEpoch, unused in clients
dst = kbin.AppendInt8(dst, 2) // magic, defined as 2 for records v0.11.0.0+
dst = kbin.AppendInt8(dst, 2) // magic, defined as 2 for records v0.11.0+

crcStart := len(dst) // fill at end
dst = kbin.AppendInt32(dst, 0) // reserved crc
Expand Down
2 changes: 1 addition & 1 deletion pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe

case kerr.OffsetOutOfRange:
// If we are out of range, we reset to what we can.
// With Kafka >= 2.1.0, we should only get offset out
// With Kafka >= 2.1, we should only get offset out
// of range if we fetch before the start, but a user
// could start past the end and want to reset to
// the end. We respect that.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kgo/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type GroupTransactSession struct {
// The problem with (a) is that if your ETL work loop is slow, you run the risk
// of exceeding the rebalance timeout and being kicked from the group. You will
// try to commit, and depending on the Kafka version, the commit may even be
// erroneously successful (pre Kafka 2.5.0). This will lead to duplicates.
// erroneously successful (pre Kafka 2.5). This will lead to duplicates.
//
// Instead, for safety, a GroupTransactSession favors (b). If a rebalance
// occurs at any time before ending a transaction with a commit, this will
Expand Down Expand Up @@ -753,7 +753,7 @@ func (cl *Client) AbortBufferedRecords(ctx context.Context) error {
// not retry with TryAbort.
//
// If records failed with UnknownProducerID and your Kafka version is at least
// 2.5.0, then aborting here will potentially allow the client to recover for
// 2.5, then aborting here will potentially allow the client to recover for
// more production.
//
// Note that canceling the context will likely leave the client in an
Expand Down
11 changes: 7 additions & 4 deletions pkg/kversion/kversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (vs *Versions) String() string {
// Stable is a shortcut for the latest _released_ Kafka versions.
//
// This is the default version used in kgo to avoid breaking tip changes.
func Stable() *Versions { return zkBrokerOf(max300) }
func Stable() *Versions { return zkBrokerOf(maxStable) }

// Tip is the latest defined Kafka key versions; this may be slightly out of date.
func Tip() *Versions { return zkBrokerOf(maxTip) }
Expand Down Expand Up @@ -934,6 +934,9 @@ var max330 = nextMax(max320, func(v listenerKeys) listenerKeys {
return v
})

var maxTip = nextMax(max330, func(v listenerKeys) listenerKeys {
return v
})
var (
maxStable = max330
maxTip = nextMax(maxStable, func(v listenerKeys) listenerKeys {
return v
})
)

0 comments on commit a7f5d0d

Please sign in to comment.