Skip to content

Commit

Permalink
kadm: ListOffsetsAfterMill(future) should return end offsets
Browse files Browse the repository at this point in the history
Previously, ListOffsetsAfterMilli(future ts) would return -1 offsets for
timestamps in the future / for when partitions do not have a record
after the requested timestamp.

After using this for a while, IMO this is not desirable behavior: if I
don't have an offset after a given time, then I want the end offset.
This changes the behavior to match that.
  • Loading branch information
twmb committed Oct 10, 2022
1 parent bbac68b commit 7b8d404
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 18 deletions.
14 changes: 14 additions & 0 deletions pkg/kadm/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,20 @@ func (se *ShardErrors) into() error {
return se
}

// Merges two shard errors; the input errors should come from the same request.
func mergeShardErrs(e1, e2 error) error {
var se1, se2 *ShardErrors
if !errors.As(e1, &se1) {
return e2
}
if !errors.As(e2, &se2) {
return e1
}
se1.Errs = append(se1.Errs, se2.Errs...)
se1.AllFailed = se1.AllFailed && se2.AllFailed
return se1
}

// Error returns an error indicating the name of the request that failed, the
// number of separate errors, and the first error.
func (e *ShardErrors) Error() string {
Expand Down
61 changes: 43 additions & 18 deletions pkg/kadm/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (L
// millisecond timestamp. Unlike listing start/end/committed offsets, offsets
// returned from this function also include the timestamp of the offset. If no
// topics are specified, all topics are listed. If a partition has no offsets
// after the requested millisecond, the offset will be -1.
// after the requested millisecond, the offset will be the current end offset.
//
// This may return *ShardErrors.
func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error) {
Expand All @@ -404,23 +404,12 @@ func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int
return nil, err
}

req := kmsg.NewPtrListOffsetsRequest()
req.IsolationLevel = isolation
for t, td := range tds {
rt := kmsg.NewListOffsetsRequestTopic()
rt.Topic = t
for p := range td.Partitions {
rp := kmsg.NewListOffsetsRequestTopicPartition()
rp.Partition = p
rp.Timestamp = timestamp
rt.Partitions = append(rt.Partitions, rp)
}
req.Topics = append(req.Topics, rt)
}

shards := cl.cl.RequestSharded(ctx, req)
// If we request with timestamps, we may request twice: once for after
// timestamps, and once for any -1 (and no error) offsets where the
// timestamp is in the future.
list := make(ListedOffsets)
return list, shardErrEach(req, shards, func(kr kmsg.Response) error {
rerequest := make(map[string][]int32)
shardfn := func(kr kmsg.Response) error {
resp := kr.(*kmsg.ListOffsetsResponse)
for _, t := range resp.Topics {
lt, ok := list[t.Topic]
Expand All @@ -440,8 +429,44 @@ func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int
LeaderEpoch: p.LeaderEpoch,
Err: kerr.ErrorForCode(p.ErrorCode),
}
if timestamp != -1 && p.Offset == -1 && p.ErrorCode == 0 {
rerequest[t.Topic] = append(rerequest[t.Topic], p.Partition)
}
}
}
return nil
})
}

req := kmsg.NewPtrListOffsetsRequest()
req.IsolationLevel = isolation
for t, td := range tds {
rt := kmsg.NewListOffsetsRequestTopic()
rt.Topic = t
for p := range td.Partitions {
rp := kmsg.NewListOffsetsRequestTopicPartition()
rp.Partition = p
rp.Timestamp = timestamp
rt.Partitions = append(rt.Partitions, rp)
}
req.Topics = append(req.Topics, rt)
}
shards := cl.cl.RequestSharded(ctx, req)
err = shardErrEach(req, shards, shardfn)
if len(rerequest) > 0 {
req.Topics = req.Topics[:0]
for t, ps := range rerequest {
rt := kmsg.NewListOffsetsRequestTopic()
rt.Topic = t
for _, p := range ps {
rp := kmsg.NewListOffsetsRequestTopicPartition()
rp.Partition = p
rp.Timestamp = -1 // we always list end offsets when rerequesting
rt.Partitions = append(rt.Partitions, rp)
}
req.Topics = append(req.Topics, rt)
}
shards = cl.cl.RequestSharded(ctx, req)
err = mergeShardErrs(err, shardErrEach(req, shards, shardfn))
}
return list, err
}

0 comments on commit 7b8d404

Please sign in to comment.