Skip to content

Commit

Permalink
Merge pull request IBM#1008 from buyology/delete-topics
Browse files Browse the repository at this point in the history
add DeleteTopicsRequest/Response
  • Loading branch information
eapache authored Dec 21, 2017
2 parents 541ca4a + 620549d commit 1fcddd9
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 0 deletions.
11 changes: 11 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,17 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon
return response, nil
}

func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
response := new(DeleteTopicsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
b.lock.Lock()
defer b.lock.Unlock()
Expand Down
41 changes: 41 additions & 0 deletions delete_topics_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package sarama

import "time"

type DeleteTopicsRequest struct {
Topics []string
Timeout time.Duration
}

func (d *DeleteTopicsRequest) encode(pe packetEncoder) error {
if err := pe.putStringArray(d.Topics); err != nil {
return err
}
pe.putInt32(int32(d.Timeout / time.Millisecond))

return nil
}

func (d *DeleteTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
if d.Topics, err = pd.getStringArray(); err != nil {
return err
}
timeout, err := pd.getInt32()
if err != nil {
return err
}
d.Timeout = time.Duration(timeout) * time.Millisecond
return nil
}

func (d *DeleteTopicsRequest) key() int16 {
return 20
}

func (d *DeleteTopicsRequest) version() int16 {
return 0
}

func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion {
return V0_10_1_0
}
22 changes: 22 additions & 0 deletions delete_topics_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package sarama

import (
"testing"
"time"
)

var deleteTopicsRequest = []byte{
0, 0, 0, 2,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 5, 'o', 't', 'h', 'e', 'r',
0, 0, 0, 100,
}

func TestDeleteTopicsRequest(t *testing.T) {
req := &DeleteTopicsRequest{
Topics: []string{"topic", "other"},
Timeout: 100 * time.Millisecond,
}

testRequest(t, "", req, deleteTopicsRequest)
}
78 changes: 78 additions & 0 deletions delete_topics_response.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package sarama

import "time"

type DeleteTopicsResponse struct {
Version int16
ThrottleTime time.Duration
TopicErrorCodes map[string]KError
}

func (d *DeleteTopicsResponse) encode(pe packetEncoder) error {
if d.Version >= 1 {
pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
}

if err := pe.putArrayLength(len(d.TopicErrorCodes)); err != nil {
return err
}
for topic, errorCode := range d.TopicErrorCodes {
if err := pe.putString(topic); err != nil {
return err
}
pe.putInt16(int16(errorCode))
}

return nil
}

func (d *DeleteTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
if version >= 1 {
throttleTime, err := pd.getInt32()
if err != nil {
return err
}
d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond

d.Version = version
}

n, err := pd.getArrayLength()
if err != nil {
return err
}

d.TopicErrorCodes = make(map[string]KError, n)

for i := 0; i < n; i++ {
topic, err := pd.getString()
if err != nil {
return err
}
errorCode, err := pd.getInt16()
if err != nil {
return err
}

d.TopicErrorCodes[topic] = KError(errorCode)
}

return nil
}

func (d *DeleteTopicsResponse) key() int16 {
return 20
}

func (d *DeleteTopicsResponse) version() int16 {
return d.Version
}

func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion {
switch d.Version {
case 1:
return V0_11_0_0
default:
return V0_10_1_0
}
}
36 changes: 36 additions & 0 deletions delete_topics_response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package sarama

import (
"testing"
"time"
)

var (
deleteTopicsResponseV0 = []byte{
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0,
}

deleteTopicsResponseV1 = []byte{
0, 0, 0, 100,
0, 0, 0, 1,
0, 5, 't', 'o', 'p', 'i', 'c',
0, 0,
}
)

func TestDeleteTopicsResponse(t *testing.T) {
resp := &DeleteTopicsResponse{
TopicErrorCodes: map[string]KError{
"topic": ErrNoError,
},
}

testResponse(t, "version 0", resp, deleteTopicsResponseV0)

resp.Version = 1
resp.ThrottleTime = 100 * time.Millisecond

testResponse(t, "version 1", resp, deleteTopicsResponseV1)
}
2 changes: 2 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func allocateBody(key, version int16) protocolBody {
return &ApiVersionsRequest{}
case 19:
return &CreateTopicsRequest{}
case 20:
return &DeleteTopicsRequest{}
case 37:
return &CreatePartitionsRequest{}
}
Expand Down

0 comments on commit 1fcddd9

Please sign in to comment.