diff --git a/consumer.go b/consumer.go index 46c26134c..f9cd172b4 100644 --- a/consumer.go +++ b/consumer.go @@ -468,9 +468,7 @@ feederLoop: } for i, msg := range msgs { - for _, interceptor := range child.conf.Consumer.Interceptors { - msg.safelyApplyInterceptor(interceptor) - } + child.interceptors(msg) messageSelect: select { case <-child.dying: @@ -484,6 +482,7 @@ feederLoop: child.broker.acks.Done() remainingLoop: for _, msg = range msgs[i:] { + child.interceptors(msg) select { case child.messages <- msg: case <-child.dying: @@ -715,6 +714,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu return messages, nil } +func (child *partitionConsumer) interceptors(msg *ConsumerMessage) { + for _, interceptor := range child.conf.Consumer.Interceptors { + msg.safelyApplyInterceptor(interceptor) + } +} + type brokerConsumer struct { consumer *consumer broker *Broker