Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to configure mappings from fluent attributes to otel fields #5173

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions receiver/fluentforwardreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,26 @@ This receiver:
- If using TCP, it will start a UDP server on the same port to deliver
heartbeat echos, as per the spec.

Fluentforward doesn't have a native notion of log body, severity, resources and so on.
We could considere evething as structured body or attributes. Attributes has been choosed to ease processors manipulation.
With `mappings` config you can move a fluent attribute to a body attribute or as severity text.
In this case, all found attributes will be moved.
If you set body_encoding to map it will not be serialized.

Here is a basic example config that makes the receiver listen on all interfaces
on port 8006:

```yaml
receivers:
fluentforward:
endpoint: 0.0.0.0:8006
body_encoding: otel
mappings:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anyone has a better idea / strategy about these mapping I take it. It makes the job but I have the feeling that it's not ideal.
But what I think is the concept could be reused on other receivers where underlying technology doesn't fit with all OTEL concepts.

body:
- log
- message
severity:
- severity
```


Expand Down
37 changes: 36 additions & 1 deletion receiver/fluentforwardreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package fluentforwardreceiver

import "go.opentelemetry.io/collector/config"
import (
"errors"

"go.opentelemetry.io/collector/config"
)

// Config defines configuration for the SignalFx receiver.
type Config struct {
Expand All @@ -24,4 +28,35 @@ type Config struct {
// of the form `<ip addr>:<port>` (TCP) or `unix://<socket_path>` (Unix
// domain socket).
ListenAddress string `mapstructure:"endpoint"`

Mappings MappingWhitelist `mapstructure:"mappings"`

BodyAsString string `mapstructure:"body_encoding"`
}

func (c *Config) validate() error {
if c.ListenAddress == "" {
return errors.New("`endpoint` not specified")
}

if c.BodyAsString == "" {
c.BodyAsString = "otel"
}
if !Contains([]string{"otel", "map"}, c.BodyAsString) {
return errors.New("Invalid body_encoding")
}

if len(c.Mappings.Body) == 0 {
c.Mappings.Body = []string{"log", "message"}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default value for retrocompatibility.
Both log or message will be considered as "body" value

}
return nil
}

// MappingWhitelist defines the fluent attributes mapping configuration
type MappingWhitelist struct {
// Body is the list of fluent attributes ok to become the OTEL body
Body []string `mapstructure:"body"`

// Severity is the list of fluent attributes ok to become the OTEL SeverityText
Severity []string `mapstructure:"severity"`
}
71 changes: 51 additions & 20 deletions receiver/fluentforwardreceiver/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const tagAttributeKey = "fluent.tag"
// which describes the fields in much greater detail.

type Event interface {
DecodeMsg(dc *msgp.Reader) error
DecodeMsg(dc *msgp.Reader, conf *Config) error
LogRecords() pdata.LogSlice
Chunk() string
Compressed() string
Expand Down Expand Up @@ -135,8 +135,10 @@ func decodeTimestampToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error {
return nil
}

func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error {
func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord, conf *Config) error {
attrs := lr.Attributes()
bodyMap := lr.Body().MapVal()
lr.SetName("fluent[d|bit]")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Arbitrary name to provide something in name (I think otel collector should at least show the good exemple)


recordLen, err := dc.ReadMapHeader()
if err != nil {
Expand All @@ -160,25 +162,52 @@ func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error {
return msgp.WrapError(err, "Record", key)
}

// fluentd uses message, fluentbit log.
if key == "message" || key == "log" {
switch v := val.(type) {
case string:
lr.Body().SetStringVal(v)
case []uint8:
// Sometimes strings come in as uint8's.
lr.Body().SetStringVal(string(v))
default:
return fmt.Errorf("cannot convert message type %T to string", val)
if Contains(conf.Mappings.Body, key) {
insertToAttributeMap(key, val, &bodyMap)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By simplicity, I try to do the best to convert a maximum of fields in the body, se I work on body attributemap instead of string value.

} else if Contains(conf.Mappings.Severity, key) {
s, err := ValueToString(val)
if err != nil {
return err
}
lr.SetSeverityText(s)
} else {
insertToAttributeMap(key, val, &attrs)
}
}
if conf.BodyAsString == "otel" {
lr.Body().SetStringVal(lr.Body().AsString())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the user prefers a string body, we convert it here.

This will change a little the actual behavior, for a basic use case where you convert the log field to body we will have something like {"log": "raw string"} (really not sure about the output format of lr.Body().AsString()) instead of raw string

Any concern about that or the encoding choice ?

}

return nil
}

func ValueToString(val interface{}) (string, error) {
switch v := val.(type) {
case string:
return v, nil
case map[string]interface{}, []interface{}:
encoded, err := json.Marshal(v)
if err != nil {
return "", err
}
return string(encoded), nil
case []byte:
// Sometimes strings come in as bytes array.
return string(v), nil
default:
return "", fmt.Errorf("cannot convert message type %T to string", val)
}
}

func Contains(slice []string, element string) bool {
for _, val := range slice {
if val == element {
return true
}
}
return false
}

type MessageEventLogRecord struct {
pdata.LogSlice
OptionsMap
Expand All @@ -188,7 +217,7 @@ func (melr *MessageEventLogRecord) LogRecords() pdata.LogSlice {
return melr.LogSlice
}

func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error {
func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader, conf *Config) error {
melr.LogSlice = pdata.NewLogSlice()
log := melr.LogSlice.AppendEmpty()

Expand Down Expand Up @@ -216,7 +245,7 @@ func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error {
return msgp.WrapError(err, "Time")
}

err = parseRecordToLogRecord(dc, log)
err = parseRecordToLogRecord(dc, log, conf)
if err != nil {
return err
}
Expand Down Expand Up @@ -262,7 +291,7 @@ func (fe *ForwardEventLogRecords) LogRecords() pdata.LogSlice {
return fe.LogSlice
}

func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) {
func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader, conf *Config) (err error) {
fe.LogSlice = pdata.NewLogSlice()

var arrLen uint32
Expand Down Expand Up @@ -291,7 +320,7 @@ func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) {
for i := 0; i < int(entryLen); i++ {
lr := fe.LogSlice.AppendEmpty()

err = parseEntryToLogRecord(dc, lr)
err = parseEntryToLogRecord(dc, lr, conf)
if err != nil {
return msgp.WrapError(err, "Entries", i)
}
Expand All @@ -308,7 +337,7 @@ func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) {
return
}

func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error {
func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord, conf *Config) error {
arrLen, err := dc.ReadArrayHeader()
if err != nil {
return msgp.WrapError(err)
Expand All @@ -322,12 +351,13 @@ func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error {
return msgp.WrapError(err, "Time")
}

return parseRecordToLogRecord(dc, lr)
return parseRecordToLogRecord(dc, lr, conf)
}

type PackedForwardEventLogRecords struct {
pdata.LogSlice
OptionsMap
conf *Config
}

func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice {
Expand All @@ -336,8 +366,9 @@ func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice {

// DecodeMsg implements msgp.Decodable. This was originally code generated but
// then manually copied here in order to handle the optional Options field.
func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) error {
func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader, conf *Config) error {
pfe.LogSlice = pdata.NewLogSlice()
pfe.conf = conf

arrLen, err := dc.ReadArrayHeader()
if err != nil {
Expand Down Expand Up @@ -412,7 +443,7 @@ func (pfe *PackedForwardEventLogRecords) parseEntries(entriesRaw []byte, isGzipp
msgpReader := msgp.NewReader(reader)
for {
lr := pdata.NewLogRecord()
err := parseEntryToLogRecord(msgpReader, lr)
err := parseEntryToLogRecord(msgpReader, lr, pfe.conf)
if err != nil {
if msgp.Cause(err) == io.EOF {
return nil
Expand Down
34 changes: 27 additions & 7 deletions receiver/fluentforwardreceiver/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ import (
func TestMessageEventConversion(t *testing.T) {
eventBytes := parseHexDump("testdata/message-event")
reader := msgp.NewReader(bytes.NewReader(eventBytes))
config := &Config{
ListenAddress: "127.0.0.1:0",
Mappings: MappingWhitelist{Severity: []string{"severity"}},
}

var event MessageEventLogRecord
err := event.DecodeMsg(reader)
err := event.DecodeMsg(reader, config)
require.Nil(t, err)

le := event.LogRecords().At(0)
Expand Down Expand Up @@ -93,9 +97,13 @@ func TestAttributeTypeConversion(t *testing.T) {
require.NoError(t, err)

reader := msgp.NewReader(bytes.NewReader(b))
config := &Config{
ListenAddress: "127.0.0.1:0",
Mappings: MappingWhitelist{Severity: []string{"severity"}},
}

var event MessageEventLogRecord
err = event.DecodeMsg(reader)
err = event.DecodeMsg(reader, config)
require.Nil(t, err)

le := event.LogRecords().At(0)
Expand Down Expand Up @@ -150,13 +158,17 @@ func TestMessageEventConversionWithErrors(t *testing.T) {
b = msgp.AppendMapHeader(b, 1)
b = msgp.AppendString(b, "a")
b = msgp.AppendFloat64(b, 5.0)
config := &Config{
ListenAddress: "127.0.0.1:0",
Mappings: MappingWhitelist{Severity: []string{"severity"}},
}

for i := 0; i < len(b)-1; i++ {
t.Run(fmt.Sprintf("EOF at byte %d", i), func(t *testing.T) {
reader := msgp.NewReader(bytes.NewReader(b[:i]))

var event MessageEventLogRecord
err := event.DecodeMsg(reader)
err := event.DecodeMsg(reader, config)
require.NotNil(t, err)
})
}
Expand All @@ -168,34 +180,42 @@ func TestMessageEventConversionWithErrors(t *testing.T) {
reader := msgp.NewReader(bytes.NewReader(in))

var event MessageEventLogRecord
err := event.DecodeMsg(reader)
err := event.DecodeMsg(reader, config)
require.NotNil(t, err)
})
}

func TestForwardEventConversionWithErrors(t *testing.T) {
b := parseHexDump("testdata/forward-event")
config := &Config{
ListenAddress: "127.0.0.1:0",
Mappings: MappingWhitelist{Severity: []string{"severity"}},
}

for i := 0; i < len(b)-1; i++ {
t.Run(fmt.Sprintf("EOF at byte %d", i), func(t *testing.T) {
reader := msgp.NewReader(bytes.NewReader(b[:i]))

var event ForwardEventLogRecords
err := event.DecodeMsg(reader)
err := event.DecodeMsg(reader, config)
require.NotNil(t, err)
})
}
}

func TestPackedForwardEventConversionWithErrors(t *testing.T) {
b := parseHexDump("testdata/forward-packed-compressed")
config := &Config{
ListenAddress: "127.0.0.1:0",
Mappings: MappingWhitelist{Severity: []string{"severity"}},
}

for i := 0; i < len(b)-1; i++ {
t.Run(fmt.Sprintf("EOF at byte %d", i), func(t *testing.T) {
reader := msgp.NewReader(bytes.NewReader(b[:i]))

var event PackedForwardEventLogRecords
err := event.DecodeMsg(reader)
err := event.DecodeMsg(reader, config)
require.NotNil(t, err)
})
}
Expand All @@ -207,7 +227,7 @@ func TestPackedForwardEventConversionWithErrors(t *testing.T) {
reader := msgp.NewReader(bytes.NewReader(in))

var event PackedForwardEventLogRecords
err := event.DecodeMsg(reader)
err := event.DecodeMsg(reader, config)
require.NotNil(t, err)
require.Contains(t, err.Error(), "gzip")
print(err.Error())
Expand Down
3 changes: 2 additions & 1 deletion receiver/fluentforwardreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ type fluentReceiver struct {

func newFluentReceiver(logger *zap.Logger, conf *Config, next consumer.Logs) (component.LogsReceiver, error) {
eventCh := make(chan Event, eventChannelLength)
conf.validate()

collector := newCollector(eventCh, next, logger)

server := newServer(eventCh, logger)
server := newServer(eventCh, logger, conf)

return &fluentReceiver{
collector: collector,
Expand Down
6 changes: 4 additions & 2 deletions receiver/fluentforwardreceiver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ const readBufferSize = 10 * 1024
type server struct {
outCh chan<- Event
logger *zap.Logger
conf *Config
}

func newServer(outCh chan<- Event, logger *zap.Logger) *server {
func newServer(outCh chan<- Event, logger *zap.Logger, conf *Config) *server {
return &server{
outCh: outCh,
logger: logger,
conf: conf,
}
}

Expand Down Expand Up @@ -111,7 +113,7 @@ func (s *server) handleConn(ctx context.Context, conn net.Conn) error {
panic("programmer bug in mode handling")
}

err = event.DecodeMsg(reader)
err = event.DecodeMsg(reader, s.conf)
if err != nil {
if err != io.EOF {
stats.Record(ctx, observ.FailedToParse.M(1))
Expand Down