Skip to content

Commit

Permalink
[receiver/kakfareceiver] add: text unmarshaler support in kafka recei… (
Browse files Browse the repository at this point in the history
#20857)

Add text unmarshaler, which will decode the kafka message as text and insert it as the body of a log record.

Link to tracking Issue: #20734
  • Loading branch information
h0cheung authored May 17, 2023
1 parent 28aa9ef commit 42c87ac
Show file tree
Hide file tree
Showing 11 changed files with 487 additions and 8 deletions.
16 changes: 16 additions & 0 deletions .chloggen/kafka_receiver_text.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add `text` unmarshaler, which will decode the kafka message as text and insert it as the body of a log record.

# One or more tracking issues related to the change
issues: [20734]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
2 changes: 1 addition & 1 deletion internal/coreinternal/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
go.opentelemetry.io/otel v1.15.1
go.opentelemetry.io/otel/trace v1.15.1
go.uber.org/zap v1.24.0
golang.org/x/text v0.9.0
gopkg.in/yaml.v3 v3.0.1
)

Expand All @@ -31,7 +32,6 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
107 changes: 107 additions & 0 deletions internal/coreinternal/textutils/encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package textutils // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/textutils"

import (
"errors"
"fmt"
"strings"

"golang.org/x/text/encoding"
"golang.org/x/text/encoding/ianaindex"
"golang.org/x/text/encoding/unicode"
"golang.org/x/text/transform"
)

// NewBasicConfig creates a new Encoding config
func NewEncodingConfig() EncodingConfig {
return EncodingConfig{
Encoding: "utf-8",
}
}

// EncodingConfig is the configuration of a Encoding helper
type EncodingConfig struct {
Encoding string `mapstructure:"encoding,omitempty"`
}

// Build will build an Encoding operator.
func (c EncodingConfig) Build() (Encoding, error) {
enc, err := lookupEncoding(c.Encoding)
if err != nil {
return Encoding{}, err
}

return Encoding{
Encoding: enc,
decodeBuffer: make([]byte, 1<<12),
decoder: enc.NewDecoder(),
}, nil
}

type Encoding struct {
Encoding encoding.Encoding
decoder *encoding.Decoder
decodeBuffer []byte
}

// Decode converts the bytes in msgBuf to utf-8 from the configured encoding
func (e *Encoding) Decode(msgBuf []byte) ([]byte, error) {
for {
e.decoder.Reset()
nDst, _, err := e.decoder.Transform(e.decodeBuffer, msgBuf, true)
if err == nil {
return e.decodeBuffer[:nDst], nil
}
if errors.Is(err, transform.ErrShortDst) {
e.decodeBuffer = make([]byte, len(e.decodeBuffer)*2)
continue
}
return nil, fmt.Errorf("transform encoding: %w", err)
}
}

var encodingOverrides = map[string]encoding.Encoding{
"utf-16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf16": unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM),
"utf-8": unicode.UTF8,
"utf8": unicode.UTF8,
"ascii": unicode.UTF8,
"us-ascii": unicode.UTF8,
"nop": encoding.Nop,
"": unicode.UTF8,
}

func lookupEncoding(enc string) (encoding.Encoding, error) {
if e, ok := encodingOverrides[strings.ToLower(enc)]; ok {
return e, nil
}
e, err := ianaindex.IANA.Encoding(enc)
if err != nil {
return nil, fmt.Errorf("unsupported encoding '%s'", enc)
}
if e == nil {
return nil, fmt.Errorf("no charmap defined for encoding '%s'", enc)
}
return e, nil
}

func IsNop(enc string) bool {
e, err := lookupEncoding(enc)
if err != nil {
return false
}
return e == encoding.Nop
}
64 changes: 64 additions & 0 deletions internal/coreinternal/textutils/encoding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package textutils

import (
"testing"

"github.com/stretchr/testify/assert"
"golang.org/x/text/encoding"
"golang.org/x/text/encoding/japanese"
"golang.org/x/text/encoding/korean"
"golang.org/x/text/encoding/simplifiedchinese"
"golang.org/x/text/encoding/unicode"
)

func TestUTF8Encoding(t *testing.T) {
tests := []struct {
name string
encoding encoding.Encoding
encodingName string
}{
{
name: "UTF8 encoding",
encoding: unicode.UTF8,
encodingName: "utf8",
},
{
name: "GBK encoding",
encoding: simplifiedchinese.GBK,
encodingName: "gbk",
},
{
name: "SHIFT_JIS encoding",
encoding: japanese.ShiftJIS,
encodingName: "shift_jis",
},
{
name: "EUC-KR encoding",
encoding: korean.EUCKR,
encodingName: "euc-kr",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
encCfg := NewEncodingConfig()
encCfg.Encoding = test.encodingName
enc, err := encCfg.Build()
assert.NoError(t, err)
assert.Equal(t, test.encoding, enc.Encoding)
})
}
}
1 change: 1 addition & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The following settings can be optionally configured:
- `zipkin_json`: the payload is deserialized into a list of Zipkin V2 JSON spans.
- `zipkin_thrift`: the payload is deserialized into a list of Zipkin Thrift spans.
- `raw`: (logs only) the payload's bytes are inserted as the body of a log record.
- `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
Expand Down
42 changes: 35 additions & 7 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect
import (
"context"
"fmt"
"strings"
"sync"

"github.com/Shopify/sarama"
Expand Down Expand Up @@ -271,11 +272,6 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error {
}

func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) {
unmarshaler := unmarshalers[config.Encoding]
if unmarshaler == nil {
return nil, errUnrecognizedEncoding
}

c := sarama.NewConfig()
c.ClientID = config.ClientID
c.Metadata.Full = config.Metadata.Full
Expand All @@ -288,14 +284,19 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma
} else {
return nil, err
}
unmarshaler, err := getLogsUnmarshaler(config.Encoding, unmarshalers)
if err != nil {
return nil, err
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
var version sarama.KafkaVersion
version, err = sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
return nil, err
}
c.Version = version
}
if err := kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
if err = kafkaexporter.ConfigureAuthentication(config.Authentication, c); err != nil {
return nil, err
}
client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c)
Expand All @@ -313,6 +314,33 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma
}, nil
}

func getLogsUnmarshaler(encoding string, unmarshalers map[string]LogsUnmarshaler) (LogsUnmarshaler, error) {
var enc string
unmarshaler, ok := unmarshalers[encoding]
if !ok {
split := strings.SplitN(encoding, "_", 2)
prefix := split[0]
if len(split) > 1 {
enc = split[1]
}
unmarshaler, ok = unmarshalers[prefix].(LogsUnmarshalerWithEnc)
if !ok {
return nil, errUnrecognizedEncoding
}
}

if unmarshalerWithEnc, ok := unmarshaler.(LogsUnmarshalerWithEnc); ok {
// This should be called even when enc is an empty string to initialize the encoding.
unmarshaler, err := unmarshalerWithEnc.WithEnc(enc)
if err != nil {
return nil, err
}
return unmarshaler, nil
}

return unmarshaler, nil
}

func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error {
ctx, cancel := context.WithCancel(context.Background())
c.cancelConsumeLoop = cancel
Expand Down
Loading

0 comments on commit 42c87ac

Please sign in to comment.