diff --git a/broker.go b/broker.go index 3955ca2b4..e04c7ddc2 100644 --- a/broker.go +++ b/broker.go @@ -384,6 +384,17 @@ func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsRespon return response, nil } +func (b *Broker) DeleteTopics(request *DeleteTopicsRequest) (*DeleteTopicsResponse, error) { + response := new(DeleteTopicsResponse) + + err := b.sendAndReceive(request, response) + if err != nil { + return nil, err + } + + return response, nil +} + func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) { b.lock.Lock() defer b.lock.Unlock() diff --git a/delete_topics_request.go b/delete_topics_request.go new file mode 100644 index 000000000..ed9089ea4 --- /dev/null +++ b/delete_topics_request.go @@ -0,0 +1,41 @@ +package sarama + +import "time" + +type DeleteTopicsRequest struct { + Topics []string + Timeout time.Duration +} + +func (d *DeleteTopicsRequest) encode(pe packetEncoder) error { + if err := pe.putStringArray(d.Topics); err != nil { + return err + } + pe.putInt32(int32(d.Timeout / time.Millisecond)) + + return nil +} + +func (d *DeleteTopicsRequest) decode(pd packetDecoder, version int16) (err error) { + if d.Topics, err = pd.getStringArray(); err != nil { + return err + } + timeout, err := pd.getInt32() + if err != nil { + return err + } + d.Timeout = time.Duration(timeout) * time.Millisecond + return nil +} + +func (d *DeleteTopicsRequest) key() int16 { + return 20 +} + +func (d *DeleteTopicsRequest) version() int16 { + return 0 +} + +func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion { + return V0_10_1_0 +} diff --git a/delete_topics_request_test.go b/delete_topics_request_test.go new file mode 100644 index 000000000..ce940823c --- /dev/null +++ b/delete_topics_request_test.go @@ -0,0 +1,22 @@ +package sarama + +import ( + "testing" + "time" +) + +var deleteTopicsRequest = []byte{ + 0, 0, 0, 2, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 5, 'o', 't', 'h', 'e', 'r', + 0, 0, 0, 100, +} + +func TestDeleteTopicsRequest(t *testing.T) { + req := &DeleteTopicsRequest{ + Topics: []string{"topic", "other"}, + Timeout: 100 * time.Millisecond, + } + + testRequest(t, "", req, deleteTopicsRequest) +} diff --git a/delete_topics_response.go b/delete_topics_response.go new file mode 100644 index 000000000..34225460a --- /dev/null +++ b/delete_topics_response.go @@ -0,0 +1,78 @@ +package sarama + +import "time" + +type DeleteTopicsResponse struct { + Version int16 + ThrottleTime time.Duration + TopicErrorCodes map[string]KError +} + +func (d *DeleteTopicsResponse) encode(pe packetEncoder) error { + if d.Version >= 1 { + pe.putInt32(int32(d.ThrottleTime / time.Millisecond)) + } + + if err := pe.putArrayLength(len(d.TopicErrorCodes)); err != nil { + return err + } + for topic, errorCode := range d.TopicErrorCodes { + if err := pe.putString(topic); err != nil { + return err + } + pe.putInt16(int16(errorCode)) + } + + return nil +} + +func (d *DeleteTopicsResponse) decode(pd packetDecoder, version int16) (err error) { + if version >= 1 { + throttleTime, err := pd.getInt32() + if err != nil { + return err + } + d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond + + d.Version = version + } + + n, err := pd.getArrayLength() + if err != nil { + return err + } + + d.TopicErrorCodes = make(map[string]KError, n) + + for i := 0; i < n; i++ { + topic, err := pd.getString() + if err != nil { + return err + } + errorCode, err := pd.getInt16() + if err != nil { + return err + } + + d.TopicErrorCodes[topic] = KError(errorCode) + } + + return nil +} + +func (d *DeleteTopicsResponse) key() int16 { + return 20 +} + +func (d *DeleteTopicsResponse) version() int16 { + return d.Version +} + +func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion { + switch d.Version { + case 1: + return V0_11_0_0 + default: + return V0_10_1_0 + } +} diff --git a/delete_topics_response_test.go b/delete_topics_response_test.go new file mode 100644 index 000000000..516f1a3bd --- /dev/null +++ b/delete_topics_response_test.go @@ -0,0 +1,36 @@ +package sarama + +import ( + "testing" + "time" +) + +var ( + deleteTopicsResponseV0 = []byte{ + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, + } + + deleteTopicsResponseV1 = []byte{ + 0, 0, 0, 100, + 0, 0, 0, 1, + 0, 5, 't', 'o', 'p', 'i', 'c', + 0, 0, + } +) + +func TestDeleteTopicsResponse(t *testing.T) { + resp := &DeleteTopicsResponse{ + TopicErrorCodes: map[string]KError{ + "topic": ErrNoError, + }, + } + + testResponse(t, "version 0", resp, deleteTopicsResponseV0) + + resp.Version = 1 + resp.ThrottleTime = 100 * time.Millisecond + + testResponse(t, "version 1", resp, deleteTopicsResponseV1) +} diff --git a/request.go b/request.go index 35d8c1d8b..282932ed7 100644 --- a/request.go +++ b/request.go @@ -116,6 +116,8 @@ func allocateBody(key, version int16) protocolBody { return &ApiVersionsRequest{} case 19: return &CreateTopicsRequest{} + case 20: + return &DeleteTopicsRequest{} case 37: return &CreatePartitionsRequest{} }