diff --git a/broker.go b/broker.go index 93c28ba5e9..737dc9c94c 100644 --- a/broker.go +++ b/broker.go @@ -395,6 +395,17 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon return response, nil } +func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) { + response := new(DeleteRecordsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) { response := new(DescribeAclsResponse) diff --git a/delete_records_request.go b/delete_records_request.go new file mode 100644 index 0000000000..93efafd4d0 --- /dev/null +++ b/delete_records_request.go @@ -0,0 +1,126 @@ +package sarama + +import ( + "sort" + "time" +) + +// request message format is: +// [topic] timeout(int32) +// where topic is: +// name(string) [partition] +// where partition is: +// id(int32) offset(int64) + +type DeleteRecordsRequest struct { + Topics map[string]*DeleteRecordsRequestTopic + Timeout time.Duration +} + +func (d *DeleteRecordsRequest) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(d.Topics)); err != nil { + return err + } + keys := make([]string, 0, len(d.Topics)) + for topic := range d.Topics { + keys = append(keys, topic) + } + sort.Strings(keys) + for _, topic := range keys { + if err := pe.putString(topic); err != nil { + return err + } + if err := d.Topics[topic].encode(pe); err != nil { + return err + } + } + pe.putInt32(int32(d.Timeout / time.Millisecond)) + + return nil +} + +func (d *DeleteRecordsRequest) decode(pd packetDecoder, version int16) error { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + d.Topics = make(map[string]*DeleteRecordsRequestTopic, n) + for i := 0; i < n; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + details := new(DeleteRecordsRequestTopic) + if err = details.decode(pd, version); err != nil { + return err + } + d.Topics[topic] = details + } + } + + timeout, err := pd.getInt32() + if err != nil { + return err + } + d.Timeout = time.Duration(timeout) * time.Millisecond + + return nil +} + +func (d *DeleteRecordsRequest) key() int16 { + return 21 +} + +func (d *DeleteRecordsRequest) version() int16 { + return 0 +} + +func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +type DeleteRecordsRequestTopic struct { + PartitionOffsets map[int32]int64 // partition => offset +} + +func (t *DeleteRecordsRequestTopic) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(t.PartitionOffsets)); err != nil { + return err + } + keys := make([]int32, 0, len(t.PartitionOffsets)) + for partition := range t.PartitionOffsets { + keys = append(keys, partition) + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + for _, partition := range keys { + pe.putInt32(partition) + pe.putInt64(t.PartitionOffsets[partition]) + } + return nil +} + +func (t *DeleteRecordsRequestTopic) decode(pd packetDecoder, version int16) error { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + t.PartitionOffsets = make(map[int32]int64, n) + for i := 0; i < n; i++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + offset, err := pd.getInt64() + if err != nil { + return err + } + t.PartitionOffsets[partition] = offset + } + } + + return nil +} diff --git a/delete_records_request_test.go b/delete_records_request_test.go new file mode 100644 index 0000000000..c72960cfb0 --- /dev/null +++ b/delete_records_request_test.go @@ -0,0 +1,36 @@ +package sarama + +import ( + "testing" + "time" +) + +var deleteRecordsRequest = []byte{ + 0, 0, 0, 2, + 0, 5, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 0, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 2, + 0, 0, 0, 19, + 0, 0, 0, 0, 0, 0, 0, 200, + 0, 0, 0, 20, + 0, 0, 0, 0, 0, 0, 0, 190, + 0, 0, 0, 100, +} + +func TestDeleteRecordsRequest(t *testing.T) { + req := &DeleteRecordsRequest{ + Topics: map[string]*DeleteRecordsRequestTopic{ + "topic": { + PartitionOffsets: map[int32]int64{ + 19: 200, + 20: 190, + }, + }, + "other": {}, + }, + Timeout: 100 * time.Millisecond, + } + + testRequest(t, "", req, deleteRecordsRequest) +} diff --git a/delete_records_response.go b/delete_records_response.go new file mode 100644 index 0000000000..733a58b6bc --- /dev/null +++ b/delete_records_response.go @@ -0,0 +1,158 @@ +package sarama + +import ( + "sort" + "time" +) + +// response message format is: +// throttleMs(int32) [topic] +// where topic is: +// name(string) [partition] +// where partition is: +// id(int32) low_watermark(int64) error_code(int16) + +type DeleteRecordsResponse struct { + Version int16 + ThrottleTime time.Duration + Topics map[string]*DeleteRecordsResponseTopic +} + +func (d *DeleteRecordsResponse) encode(pe packetEncoder) error { + pe.putInt32(int32(d.ThrottleTime / time.Millisecond)) + + if err := pe.putArrayLength(len(d.Topics)); err != nil { + return err + } + keys := make([]string, 0, len(d.Topics)) + for topic := range d.Topics { + keys = append(keys, topic) + } + sort.Strings(keys) + for _, topic := range keys { + if err := pe.putString(topic); err != nil { + return err + } + if err := d.Topics[topic].encode(pe); err != nil { + return err + } + } + return nil +} + +func (d *DeleteRecordsResponse) decode(pd packetDecoder, version int16) error { + d.Version = version + + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + d.Topics = make(map[string]*DeleteRecordsResponseTopic, n) + for i := 0; i < n; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + details := new(DeleteRecordsResponseTopic) + if err = details.decode(pd, version); err != nil { + return err + } + d.Topics[topic] = details + } + } + + return nil +} + +func (d *DeleteRecordsResponse) key() int16 { + return 21 +} + +func (d *DeleteRecordsResponse) version() int16 { + return 0 +} + +func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion { + return V0_11_0_0 +} + +type DeleteRecordsResponseTopic struct { + Partitions map[int32]*DeleteRecordsResponsePartition +} + +func (t *DeleteRecordsResponseTopic) encode(pe packetEncoder) error { + if err := pe.putArrayLength(len(t.Partitions)); err != nil { + return err + } + keys := make([]int32, 0, len(t.Partitions)) + for partition := range t.Partitions { + keys = append(keys, partition) + } + sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] }) + for _, partition := range keys { + pe.putInt32(partition) + if err := t.Partitions[partition].encode(pe); err != nil { + return err + } + } + return nil +} + +func (t *DeleteRecordsResponseTopic) decode(pd packetDecoder, version int16) error { + n, err := pd.getArrayLength() + if err != nil { + return err + } + + if n > 0 { + t.Partitions = make(map[int32]*DeleteRecordsResponsePartition, n) + for i := 0; i < n; i++ { + partition, err := pd.getInt32() + if err != nil { + return err + } + details := new(DeleteRecordsResponsePartition) + if err = details.decode(pd, version); err != nil { + return err + } + t.Partitions[partition] = details + } + } + + return nil +} + +type DeleteRecordsResponsePartition struct { + LowWatermark int64 + Err KError +} + +func (t *DeleteRecordsResponsePartition) encode(pe packetEncoder) error { + pe.putInt64(t.LowWatermark) + pe.putInt16(int16(t.Err)) + return nil +} + +func (t *DeleteRecordsResponsePartition) decode(pd packetDecoder, version int16) error { + lowWatermark, err := pd.getInt64() + if err != nil { + return err + } + t.LowWatermark = lowWatermark + + kErr, err := pd.getInt16() + if err != nil { + return err + } + t.Err = KError(kErr) + + return nil +} diff --git a/delete_records_response_test.go b/delete_records_response_test.go new file mode 100644 index 0000000000..3653cdc419 --- /dev/null +++ b/delete_records_response_test.go @@ -0,0 +1,39 @@ +package sarama + +import ( + "testing" + "time" +) + +var deleteRecordsResponse = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 2, + 0, 5, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 0, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, 0, 2, + 0, 0, 0, 19, + 0, 0, 0, 0, 0, 0, 0, 200, + 0, 0, + 0, 0, 0, 20, + 255, 255, 255, 255, 255, 255, 255, 255, + 0, 3, +} + +func TestDeleteRecordsResponse(t *testing.T) { + resp := &DeleteRecordsResponse{ + Version: 0, + ThrottleTime: 100 * time.Millisecond, + Topics: map[string]*DeleteRecordsResponseTopic{ + "topic": { + Partitions: map[int32]*DeleteRecordsResponsePartition{ + 19: {LowWatermark: 200, Err: 0}, + 20: {LowWatermark: -1, Err: 3}, + }, + }, + "other": {}, + }, + } + + testResponse(t, "", resp, deleteRecordsResponse) +} diff --git a/request.go b/request.go index 1f5d57faeb..454503ba56 100644 --- a/request.go +++ b/request.go @@ -118,6 +118,8 @@ func allocateBody(key, version int16) protocolBody { return &CreateTopicsRequest{} case 20: return &DeleteTopicsRequest{} + case 21: + return &DeleteRecordsRequest{} case 29: return &DescribeAclsRequest{} case 30: