Skip to content

Commit

Permalink
Merge pull request #1923 from bestgopher/master
Browse files Browse the repository at this point in the history
Remove redundant switch-case, fix doc typos
  • Loading branch information
dnwe authored Apr 29, 2021
2 parents 02d5c83 + f29bbec commit 0e2e2a4
Show file tree
Hide file tree
Showing 7 changed files with 25 additions and 33 deletions.
2 changes: 1 addition & 1 deletion acl_delete_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *DeleteAclsRequest) version() int16 {
return int16(d.Version)
}

func (c *DeleteAclsRequest) headerVersion() int16 {
func (d *DeleteAclsRequest) headerVersion() int16 {
return 1
}

Expand Down
8 changes: 4 additions & 4 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,14 +165,14 @@ func isErrNoController(err error) bool {
}

// retryOnError will repeatedly call the given (error-returning) func in the
// case that its response is non-nil and retriable (as determined by the
// provided retriable func) up to the maximum number of tries permitted by
// case that its response is non-nil and retryable (as determined by the
// provided retryable func) up to the maximum number of tries permitted by
// the admin client configuration
func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
var err error
for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
err = fn()
if err == nil || !retriable(err) {
if err == nil || !retryable(err) {
return err
}
Logger.Printf(
Expand Down
30 changes: 14 additions & 16 deletions decompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,32 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) {
case CompressionNone:
return data, nil
case CompressionGZIP:
var (
err error
reader *gzip.Reader
readerIntf = gzipReaderPool.Get()
)
if readerIntf != nil {
reader = readerIntf.(*gzip.Reader)
} else {
var err error
reader, ok := gzipReaderPool.Get().(*gzip.Reader)
if !ok {
reader, err = gzip.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
} else {
err = reader.Reset(bytes.NewReader(data))
}

defer gzipReaderPool.Put(reader)

if err := reader.Reset(bytes.NewReader(data)); err != nil {
if err != nil {
return nil, err
}

defer gzipReaderPool.Put(reader)

return ioutil.ReadAll(reader)
case CompressionSnappy:
return snappy.Decode(data)
case CompressionLZ4:
reader := lz4ReaderPool.Get().(*lz4.Reader)
reader, ok := lz4ReaderPool.Get().(*lz4.Reader)
if !ok {
reader = lz4.NewReader(bytes.NewReader(data))
} else {
reader.Reset(bytes.NewReader(data))
}
defer lz4ReaderPool.Put(reader)

reader.Reset(bytes.NewReader(data))
return ioutil.ReadAll(reader)
case CompressionZSTD:
return zstdDecompress(nil, data)
Expand Down
4 changes: 2 additions & 2 deletions encoder_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
return realEnc.raw, nil
}

// Decoder is the interface that wraps the basic Decode method.
// decoder is the interface that wraps the basic Decode method.
// Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules.
type decoder interface {
decode(pd packetDecoder) error
Expand All @@ -55,7 +55,7 @@ type versionedDecoder interface {
decode(pd packetDecoder, version int16) error
}

// Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes,
// decode takes bytes and a decoder and fills the fields of the decoder from the bytes,
// interpreted using Kafka's encoding rules.
func decode(buf []byte, in decoder) error {
if buf == nil {
Expand Down
10 changes: 2 additions & 8 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,12 @@ func (m *Message) decode(pd packetDecoder) (err error) {
// for future metrics about the compression ratio in fetch requests
m.compressedSize = len(m.Value)

switch m.Codec {
case CompressionNone:
// nothing to do
default:
if m.Value == nil {
break
}

if m.Value != nil && m.Codec != CompressionNone {
m.Value, err = decompress(m.Codec, m.Value)
if err != nil {
return err
}

if err := m.decodeSet(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion mockbroker.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type RequestNotifierFunc func(bytesRead, bytesWritten int)
// to facilitate testing of higher level or specialized consumers and producers
// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
// but rather provides a facility to do that. It takes care of the TCP
// transport, request unmarshaling, response marshaling, and makes it the test
// transport, request unmarshalling, response marshalling, and makes it the test
// writer responsibility to program correct according to the Kafka API protocol
// MockBroker behaviour.
//
Expand Down
2 changes: 1 addition & 1 deletion partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PartitionerConstructor func(topic string) Partitioner

type manualPartitioner struct{}

// HashPartitionOption lets you modify default values of the partitioner
// HashPartitionerOption lets you modify default values of the partitioner
type HashPartitionerOption func(*hashPartitioner)

// WithAbsFirst means that the partitioner handles absolute values
Expand Down

0 comments on commit 0e2e2a4

Please sign in to comment.