Skip to content

Commit

Permalink
Add DeleteRecords operation to Broker
Browse files Browse the repository at this point in the history
  • Loading branch information
ongardie-ebay committed Jan 22, 2018
1 parent 0f4f8ca commit 0b541f5
Show file tree
Hide file tree
Showing 6 changed files with 372 additions and 0 deletions.
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,17 @@ func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsRespon
return response, nil
}

func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
response := new(DeleteRecordsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) DescribeAcls(request *DescribeAclsRequest) (*DescribeAclsResponse, error) {
response := new(DescribeAclsResponse)

Expand Down
126 changes: 126 additions & 0 deletions delete_records_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package sarama

import (
"sort"
"time"
)

// request message format is:
// [topic] timeout(int32)
// where topic is:
// name(string) [partition]
// where partition is:
// id(int32) offset(int64)

type DeleteRecordsRequest struct {
Topics map[string]*DeleteRecordsRequestTopic
Timeout time.Duration
}

func (d *DeleteRecordsRequest) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(d.Topics)); err != nil {
return err
}
keys := make([]string, 0, len(d.Topics))
for topic := range d.Topics {
keys = append(keys, topic)
}
sort.Strings(keys)
for _, topic := range keys {
if err := pe.putString(topic); err != nil {
return err
}
if err := d.Topics[topic].encode(pe); err != nil {
return err
}
}
pe.putInt32(int32(d.Timeout / time.Millisecond))

return nil
}

func (d *DeleteRecordsRequest) decode(pd packetDecoder, version int16) error {
n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
d.Topics = make(map[string]*DeleteRecordsRequestTopic, n)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
details := new(DeleteRecordsRequestTopic)
if err = details.decode(pd, version); err != nil {
return err
}
d.Topics[topic] = details
}
}

timeout, err := pd.getInt32()
if err != nil {
return err
}
d.Timeout = time.Duration(timeout) * time.Millisecond

return nil
}

func (d *DeleteRecordsRequest) key() int16 {
return 21
}

func (d *DeleteRecordsRequest) version() int16 {
return 0
}

func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion {
return V0_11_0_0
}

type DeleteRecordsRequestTopic struct {
PartitionOffsets map[int32]int64 // partition => offset
}

func (t *DeleteRecordsRequestTopic) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(t.PartitionOffsets)); err != nil {
return err
}
keys := make([]int32, 0, len(t.PartitionOffsets))
for partition := range t.PartitionOffsets {
keys = append(keys, partition)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
for _, partition := range keys {
pe.putInt32(partition)
pe.putInt64(t.PartitionOffsets[partition])
}
return nil
}

func (t *DeleteRecordsRequestTopic) decode(pd packetDecoder, version int16) error {
n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
t.PartitionOffsets = make(map[int32]int64, n)
for i := 0; i < n; i++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
offset, err := pd.getInt64()
if err != nil {
return err
}
t.PartitionOffsets[partition] = offset
}
}

return nil
}
36 changes: 36 additions & 0 deletions delete_records_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package sarama

import (
"testing"
"time"
)

var deleteRecordsRequest = []byte{
0, 0, 0, 2,
0, 5, 'o', 't', 'h', 'e', 'r',
0, 0, 0, 0,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 2,
0, 0, 0, 19,
0, 0, 0, 0, 0, 0, 0, 200,
0, 0, 0, 20,
0, 0, 0, 0, 0, 0, 0, 190,
0, 0, 0, 100,
}

func TestDeleteRecordsRequest(t *testing.T) {
req := &DeleteRecordsRequest{
Topics: map[string]*DeleteRecordsRequestTopic{
"topic": {
PartitionOffsets: map[int32]int64{
19: 200,
20: 190,
},
},
"other": {},
},
Timeout: 100 * time.Millisecond,
}

testRequest(t, "", req, deleteRecordsRequest)
}
158 changes: 158 additions & 0 deletions delete_records_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package sarama

import (
"sort"
"time"
)

// response message format is:
// throttleMs(int32) [topic]
// where topic is:
// name(string) [partition]
// where partition is:
// id(int32) low_watermark(int64) error_code(int16)

type DeleteRecordsResponse struct {
Version int16
ThrottleTime time.Duration
Topics map[string]*DeleteRecordsResponseTopic
}

func (d *DeleteRecordsResponse) encode(pe packetEncoder) error {
pe.putInt32(int32(d.ThrottleTime / time.Millisecond))

if err := pe.putArrayLength(len(d.Topics)); err != nil {
return err
}
keys := make([]string, 0, len(d.Topics))
for topic := range d.Topics {
keys = append(keys, topic)
}
sort.Strings(keys)
for _, topic := range keys {
if err := pe.putString(topic); err != nil {
return err
}
if err := d.Topics[topic].encode(pe); err != nil {
return err
}
}
return nil
}

func (d *DeleteRecordsResponse) decode(pd packetDecoder, version int16) error {
d.Version = version

throttleTime, err := pd.getInt32()
if err != nil {
return err
}
d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
d.Topics = make(map[string]*DeleteRecordsResponseTopic, n)
for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
details := new(DeleteRecordsResponseTopic)
if err = details.decode(pd, version); err != nil {
return err
}
d.Topics[topic] = details
}
}

return nil
}

func (d *DeleteRecordsResponse) key() int16 {
return 21
}

func (d *DeleteRecordsResponse) version() int16 {
return 0
}

func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion {
return V0_11_0_0
}

type DeleteRecordsResponseTopic struct {
Partitions map[int32]*DeleteRecordsResponsePartition
}

func (t *DeleteRecordsResponseTopic) encode(pe packetEncoder) error {
if err := pe.putArrayLength(len(t.Partitions)); err != nil {
return err
}
keys := make([]int32, 0, len(t.Partitions))
for partition := range t.Partitions {
keys = append(keys, partition)
}
sort.Slice(keys, func(i, j int) bool { return keys[i] < keys[j] })
for _, partition := range keys {
pe.putInt32(partition)
if err := t.Partitions[partition].encode(pe); err != nil {
return err
}
}
return nil
}

func (t *DeleteRecordsResponseTopic) decode(pd packetDecoder, version int16) error {
n, err := pd.getArrayLength()
if err != nil {
return err
}

if n > 0 {
t.Partitions = make(map[int32]*DeleteRecordsResponsePartition, n)
for i := 0; i < n; i++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
details := new(DeleteRecordsResponsePartition)
if err = details.decode(pd, version); err != nil {
return err
}
t.Partitions[partition] = details
}
}

return nil
}

type DeleteRecordsResponsePartition struct {
LowWatermark int64
Err KError
}

func (t *DeleteRecordsResponsePartition) encode(pe packetEncoder) error {
pe.putInt64(t.LowWatermark)
pe.putInt16(int16(t.Err))
return nil
}

func (t *DeleteRecordsResponsePartition) decode(pd packetDecoder, version int16) error {
lowWatermark, err := pd.getInt64()
if err != nil {
return err
}
t.LowWatermark = lowWatermark

kErr, err := pd.getInt16()
if err != nil {
return err
}
t.Err = KError(kErr)

return nil
}
39 changes: 39 additions & 0 deletions delete_records_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package sarama

import (
"testing"
"time"
)

var deleteRecordsResponse = []byte{
0, 0, 0, 100,
0, 0, 0, 2,
0, 5, 'o', 't', 'h', 'e', 'r',
0, 0, 0, 0,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0, 0, 2,
0, 0, 0, 19,
0, 0, 0, 0, 0, 0, 0, 200,
0, 0,
0, 0, 0, 20,
255, 255, 255, 255, 255, 255, 255, 255,
0, 3,
}

func TestDeleteRecordsResponse(t *testing.T) {
resp := &DeleteRecordsResponse{
Version: 0,
ThrottleTime: 100 * time.Millisecond,
Topics: map[string]*DeleteRecordsResponseTopic{
"topic": {
Partitions: map[int32]*DeleteRecordsResponsePartition{
19: {LowWatermark: 200, Err: 0},
20: {LowWatermark: -1, Err: 3},
},
},
"other": {},
},
}

testResponse(t, "", resp, deleteRecordsResponse)
}
2 changes: 2 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func allocateBody(key, version int16) protocolBody {
return &CreateTopicsRequest{}
case 20:
return &DeleteTopicsRequest{}
case 21:
return &DeleteRecordsRequest{}
case 29:
return &DescribeAclsRequest{}
case 30:
Expand Down

0 comments on commit 0b541f5

Please sign in to comment.