From ad507c906e91b1fb4847ef5784d527ddac130ec1 Mon Sep 17 00:00:00 2001 From: bestgopher <84328409@qq.com> Date: Wed, 28 Apr 2021 10:37:02 +0800 Subject: [PATCH 1/5] remove redundant switch-case --- message.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/message.go b/message.go index e48566b37..33a0b25bb 100644 --- a/message.go +++ b/message.go @@ -146,14 +146,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { // for future metrics about the compression ratio in fetch requests m.compressedSize = len(m.Value) - switch m.Codec { - case CompressionNone: - // nothing to do - default: - if m.Value == nil { - break - } - + if m.Value != nil { m.Value, err = decompress(m.Codec, m.Value) if err != nil { return err From 25785c44f0f59dba5cb5ba4fc41f70a6db30c814 Mon Sep 17 00:00:00 2001 From: bestgopher <84328409@qq.com> Date: Wed, 28 Apr 2021 11:30:36 +0800 Subject: [PATCH 2/5] fix comment error --- acl_delete_request.go | 2 +- mockbroker.go | 2 +- partitioner.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/acl_delete_request.go b/acl_delete_request.go index 415252259..6efb44303 100644 --- a/acl_delete_request.go +++ b/acl_delete_request.go @@ -48,7 +48,7 @@ func (d *DeleteAclsRequest) version() int16 { return int16(d.Version) } -func (c *DeleteAclsRequest) headerVersion() int16 { +func (d *DeleteAclsRequest) headerVersion() int16 { return 1 } diff --git a/mockbroker.go b/mockbroker.go index ff5a68ae7..0c99314e5 100644 --- a/mockbroker.go +++ b/mockbroker.go @@ -30,7 +30,7 @@ type RequestNotifierFunc func(bytesRead, bytesWritten int) // to facilitate testing of higher level or specialized consumers and producers // built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol, // but rather provides a facility to do that. It takes care of the TCP -// transport, request unmarshaling, response marshaling, and makes it the test +// transport, request unmarshalling, response marshalling, and makes it the test // writer responsibility to program correct according to the Kafka API protocol // MockBroker behaviour. // diff --git a/partitioner.go b/partitioner.go index 6a708e729..a66e11ea3 100644 --- a/partitioner.go +++ b/partitioner.go @@ -42,7 +42,7 @@ type PartitionerConstructor func(topic string) Partitioner type manualPartitioner struct{} -// HashPartitionOption lets you modify default values of the partitioner +// HashPartitionerOption lets you modify default values of the partitioner type HashPartitionerOption func(*hashPartitioner) // WithAbsFirst means that the partitioner handles absolute values From ad04c4e45877ce7193c1935e719d8c11d11fd487 Mon Sep 17 00:00:00 2001 From: bestgopher <84328409@qq.com> Date: Wed, 28 Apr 2021 14:03:11 +0800 Subject: [PATCH 3/5] decompress: optimize some code --- decompress.go | 30 ++++++++++++++---------------- encoder_decoder.go | 4 ++-- message.go | 3 ++- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/decompress.go b/decompress.go index e4dc3c185..af45fdaf9 100644 --- a/decompress.go +++ b/decompress.go @@ -26,34 +26,32 @@ func decompress(cc CompressionCodec, data []byte) ([]byte, error) { case CompressionNone: return data, nil case CompressionGZIP: - var ( - err error - reader *gzip.Reader - readerIntf = gzipReaderPool.Get() - ) - if readerIntf != nil { - reader = readerIntf.(*gzip.Reader) - } else { + var err error + reader, ok := gzipReaderPool.Get().(*gzip.Reader) + if !ok { reader, err = gzip.NewReader(bytes.NewReader(data)) - if err != nil { - return nil, err - } + } else { + err = reader.Reset(bytes.NewReader(data)) } - defer gzipReaderPool.Put(reader) - - if err := reader.Reset(bytes.NewReader(data)); err != nil { + if err != nil { return nil, err } + defer gzipReaderPool.Put(reader) + return ioutil.ReadAll(reader) case CompressionSnappy: return snappy.Decode(data) case CompressionLZ4: - reader := lz4ReaderPool.Get().(*lz4.Reader) + reader, ok := lz4ReaderPool.Get().(*lz4.Reader) + if !ok { + reader = lz4.NewReader(bytes.NewReader(data)) + } else { + reader.Reset(bytes.NewReader(data)) + } defer lz4ReaderPool.Put(reader) - reader.Reset(bytes.NewReader(data)) return ioutil.ReadAll(reader) case CompressionZSTD: return zstdDecompress(nil, data) diff --git a/encoder_decoder.go b/encoder_decoder.go index 025bad61f..dab54f88c 100644 --- a/encoder_decoder.go +++ b/encoder_decoder.go @@ -45,7 +45,7 @@ func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) { return realEnc.raw, nil } -// Decoder is the interface that wraps the basic Decode method. +// decoder is the interface that wraps the basic Decode method. // Anything implementing Decoder can be extracted from bytes using Kafka's encoding rules. type decoder interface { decode(pd packetDecoder) error @@ -55,7 +55,7 @@ type versionedDecoder interface { decode(pd packetDecoder, version int16) error } -// Decode takes bytes and a Decoder and fills the fields of the decoder from the bytes, +// decode takes bytes and a decoder and fills the fields of the decoder from the bytes, // interpreted using Kafka's encoding rules. func decode(buf []byte, in decoder) error { if buf == nil { diff --git a/message.go b/message.go index 33a0b25bb..287f88df1 100644 --- a/message.go +++ b/message.go @@ -146,11 +146,12 @@ func (m *Message) decode(pd packetDecoder) (err error) { // for future metrics about the compression ratio in fetch requests m.compressedSize = len(m.Value) - if m.Value != nil { + if m.Value != nil && m.Codec == CompressionNone { m.Value, err = decompress(m.Codec, m.Value) if err != nil { return err } + if err := m.decodeSet(); err != nil { return err } From 06770ed6920947528af90b3186c58f02c781f180 Mon Sep 17 00:00:00 2001 From: bestgopher <84328409@qq.com> Date: Wed, 28 Apr 2021 22:38:11 +0800 Subject: [PATCH 4/5] admin: fix typo error --- admin.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/admin.go b/admin.go index e0b102034..c7ce20631 100644 --- a/admin.go +++ b/admin.go @@ -165,14 +165,14 @@ func isErrNoController(err error) bool { } // retryOnError will repeatedly call the given (error-returning) func in the -// case that its response is non-nil and retriable (as determined by the -// provided retriable func) up to the maximum number of tries permitted by +// case that its response is non-nil and retryable (as determined by the +// provided retryable func) up to the maximum number of tries permitted by // the admin client configuration -func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error { +func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error { var err error for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ { err = fn() - if err == nil || !retriable(err) { + if err == nil || !retryable(err) { return err } Logger.Printf( From f29bbec50302b262ad68ee898c555f9898aef317 Mon Sep 17 00:00:00 2001 From: bestgopher <84328409@qq.com> Date: Thu, 29 Apr 2021 08:04:00 +0800 Subject: [PATCH 5/5] decompress: fix error --- message.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/message.go b/message.go index 287f88df1..40b3ac9d1 100644 --- a/message.go +++ b/message.go @@ -146,7 +146,7 @@ func (m *Message) decode(pd packetDecoder) (err error) { // for future metrics about the compression ratio in fetch requests m.compressedSize = len(m.Value) - if m.Value != nil && m.Codec == CompressionNone { + if m.Value != nil && m.Codec != CompressionNone { m.Value, err = decompress(m.Codec, m.Value) if err != nil { return err