diff --git a/pkg/kadm/errors.go b/pkg/kadm/errors.go index 05bfff29..822bf614 100644 --- a/pkg/kadm/errors.go +++ b/pkg/kadm/errors.go @@ -99,6 +99,20 @@ func (se *ShardErrors) into() error { return se } +// Merges two shard errors; the input errors should come from the same request. +func mergeShardErrs(e1, e2 error) error { + var se1, se2 *ShardErrors + if !errors.As(e1, &se1) { + return e2 + } + if !errors.As(e2, &se2) { + return e1 + } + se1.Errs = append(se1.Errs, se2.Errs...) + se1.AllFailed = se1.AllFailed && se2.AllFailed + return se1 +} + // Error returns an error indicating the name of the request that failed, the // number of separate errors, and the first error. func (e *ShardErrors) Error() string { diff --git a/pkg/kadm/metadata.go b/pkg/kadm/metadata.go index 1a240277..1b69f1fb 100644 --- a/pkg/kadm/metadata.go +++ b/pkg/kadm/metadata.go @@ -391,7 +391,7 @@ func (cl *Client) ListCommittedOffsets(ctx context.Context, topics ...string) (L // millisecond timestamp. Unlike listing start/end/committed offsets, offsets // returned from this function also include the timestamp of the offset. If no // topics are specified, all topics are listed. If a partition has no offsets -// after the requested millisecond, the offset will be -1. +// after the requested millisecond, the offset will be the current end offset. // // This may return *ShardErrors. func (cl *Client) ListOffsetsAfterMilli(ctx context.Context, millisecond int64, topics ...string) (ListedOffsets, error) { @@ -404,23 +404,12 @@ func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int return nil, err } - req := kmsg.NewPtrListOffsetsRequest() - req.IsolationLevel = isolation - for t, td := range tds { - rt := kmsg.NewListOffsetsRequestTopic() - rt.Topic = t - for p := range td.Partitions { - rp := kmsg.NewListOffsetsRequestTopicPartition() - rp.Partition = p - rp.Timestamp = timestamp - rt.Partitions = append(rt.Partitions, rp) - } - req.Topics = append(req.Topics, rt) - } - - shards := cl.cl.RequestSharded(ctx, req) + // If we request with timestamps, we may request twice: once for after + // timestamps, and once for any -1 (and no error) offsets where the + // timestamp is in the future. list := make(ListedOffsets) - return list, shardErrEach(req, shards, func(kr kmsg.Response) error { + rerequest := make(map[string][]int32) + shardfn := func(kr kmsg.Response) error { resp := kr.(*kmsg.ListOffsetsResponse) for _, t := range resp.Topics { lt, ok := list[t.Topic] @@ -440,8 +429,44 @@ func (cl *Client) listOffsets(ctx context.Context, isolation int8, timestamp int LeaderEpoch: p.LeaderEpoch, Err: kerr.ErrorForCode(p.ErrorCode), } + if timestamp != -1 && p.Offset == -1 && p.ErrorCode == 0 { + rerequest[t.Topic] = append(rerequest[t.Topic], p.Partition) + } } } return nil - }) + } + + req := kmsg.NewPtrListOffsetsRequest() + req.IsolationLevel = isolation + for t, td := range tds { + rt := kmsg.NewListOffsetsRequestTopic() + rt.Topic = t + for p := range td.Partitions { + rp := kmsg.NewListOffsetsRequestTopicPartition() + rp.Partition = p + rp.Timestamp = timestamp + rt.Partitions = append(rt.Partitions, rp) + } + req.Topics = append(req.Topics, rt) + } + shards := cl.cl.RequestSharded(ctx, req) + err = shardErrEach(req, shards, shardfn) + if len(rerequest) > 0 { + req.Topics = req.Topics[:0] + for t, ps := range rerequest { + rt := kmsg.NewListOffsetsRequestTopic() + rt.Topic = t + for _, p := range ps { + rp := kmsg.NewListOffsetsRequestTopicPartition() + rp.Partition = p + rp.Timestamp = -1 // we always list end offsets when rerequesting + rt.Partitions = append(rt.Partitions, rp) + } + req.Topics = append(req.Topics, rt) + } + shards = cl.cl.RequestSharded(ctx, req) + err = mergeShardErrs(err, shardErrEach(req, shards, shardfn)) + } + return list, err }