diff --git a/receiver/fluentforwardreceiver/README.md b/receiver/fluentforwardreceiver/README.md index b23916c24eb9..2d84268e6724 100644 --- a/receiver/fluentforwardreceiver/README.md +++ b/receiver/fluentforwardreceiver/README.md @@ -18,6 +18,12 @@ This receiver: - If using TCP, it will start a UDP server on the same port to deliver heartbeat echos, as per the spec. +Fluentforward doesn't have a native notion of log body, severity, resources and so on. +We could considere evething as structured body or attributes. Attributes has been choosed to ease processors manipulation. +With `mappings` config you can move a fluent attribute to a body attribute or as severity text. +In this case, all found attributes will be moved. +If you set body_encoding to map it will not be serialized. + Here is a basic example config that makes the receiver listen on all interfaces on port 8006: @@ -25,6 +31,13 @@ on port 8006: receivers: fluentforward: endpoint: 0.0.0.0:8006 + body_encoding: otel + mappings: + body: + - log + - message + severity: + - severity ``` diff --git a/receiver/fluentforwardreceiver/config.go b/receiver/fluentforwardreceiver/config.go index afe02fe66391..b2ea5db20198 100644 --- a/receiver/fluentforwardreceiver/config.go +++ b/receiver/fluentforwardreceiver/config.go @@ -14,7 +14,11 @@ package fluentforwardreceiver -import "go.opentelemetry.io/collector/config" +import ( + "errors" + + "go.opentelemetry.io/collector/config" +) // Config defines configuration for the SignalFx receiver. type Config struct { @@ -24,4 +28,35 @@ type Config struct { // of the form `:` (TCP) or `unix://` (Unix // domain socket). ListenAddress string `mapstructure:"endpoint"` + + Mappings MappingWhitelist `mapstructure:"mappings"` + + BodyAsString string `mapstructure:"body_encoding"` +} + +func (c *Config) validate() error { + if c.ListenAddress == "" { + return errors.New("`endpoint` not specified") + } + + if c.BodyAsString == "" { + c.BodyAsString = "otel" + } + if !Contains([]string{"otel", "map"}, c.BodyAsString) { + return errors.New("Invalid body_encoding") + } + + if len(c.Mappings.Body) == 0 { + c.Mappings.Body = []string{"log", "message"} + } + return nil +} + +// MappingWhitelist defines the fluent attributes mapping configuration +type MappingWhitelist struct { + // Body is the list of fluent attributes ok to become the OTEL body + Body []string `mapstructure:"body"` + + // Severity is the list of fluent attributes ok to become the OTEL SeverityText + Severity []string `mapstructure:"severity"` } diff --git a/receiver/fluentforwardreceiver/conversion.go b/receiver/fluentforwardreceiver/conversion.go index 3832ddeb0e5b..518473c8cf88 100644 --- a/receiver/fluentforwardreceiver/conversion.go +++ b/receiver/fluentforwardreceiver/conversion.go @@ -33,7 +33,7 @@ const tagAttributeKey = "fluent.tag" // which describes the fields in much greater detail. type Event interface { - DecodeMsg(dc *msgp.Reader) error + DecodeMsg(dc *msgp.Reader, conf *Config) error LogRecords() pdata.LogSlice Chunk() string Compressed() string @@ -135,8 +135,10 @@ func decodeTimestampToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { return nil } -func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { +func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord, conf *Config) error { attrs := lr.Attributes() + bodyMap := lr.Body().MapVal() + lr.SetName("fluent[d|bit]") recordLen, err := dc.ReadMapHeader() if err != nil { @@ -160,25 +162,52 @@ func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { return msgp.WrapError(err, "Record", key) } - // fluentd uses message, fluentbit log. - if key == "message" || key == "log" { - switch v := val.(type) { - case string: - lr.Body().SetStringVal(v) - case []uint8: - // Sometimes strings come in as uint8's. - lr.Body().SetStringVal(string(v)) - default: - return fmt.Errorf("cannot convert message type %T to string", val) + if Contains(conf.Mappings.Body, key) { + insertToAttributeMap(key, val, &bodyMap) + } else if Contains(conf.Mappings.Severity, key) { + s, err := ValueToString(val) + if err != nil { + return err } + lr.SetSeverityText(s) } else { insertToAttributeMap(key, val, &attrs) } } + if conf.BodyAsString == "otel" { + lr.Body().SetStringVal(lr.Body().AsString()) + } return nil } +func ValueToString(val interface{}) (string, error) { + switch v := val.(type) { + case string: + return v, nil + case map[string]interface{}, []interface{}: + encoded, err := json.Marshal(v) + if err != nil { + return "", err + } + return string(encoded), nil + case []byte: + // Sometimes strings come in as bytes array. + return string(v), nil + default: + return "", fmt.Errorf("cannot convert message type %T to string", val) + } +} + +func Contains(slice []string, element string) bool { + for _, val := range slice { + if val == element { + return true + } + } + return false +} + type MessageEventLogRecord struct { pdata.LogSlice OptionsMap @@ -188,7 +217,7 @@ func (melr *MessageEventLogRecord) LogRecords() pdata.LogSlice { return melr.LogSlice } -func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error { +func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader, conf *Config) error { melr.LogSlice = pdata.NewLogSlice() log := melr.LogSlice.AppendEmpty() @@ -216,7 +245,7 @@ func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error { return msgp.WrapError(err, "Time") } - err = parseRecordToLogRecord(dc, log) + err = parseRecordToLogRecord(dc, log, conf) if err != nil { return err } @@ -262,7 +291,7 @@ func (fe *ForwardEventLogRecords) LogRecords() pdata.LogSlice { return fe.LogSlice } -func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) { +func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader, conf *Config) (err error) { fe.LogSlice = pdata.NewLogSlice() var arrLen uint32 @@ -291,7 +320,7 @@ func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) { for i := 0; i < int(entryLen); i++ { lr := fe.LogSlice.AppendEmpty() - err = parseEntryToLogRecord(dc, lr) + err = parseEntryToLogRecord(dc, lr, conf) if err != nil { return msgp.WrapError(err, "Entries", i) } @@ -308,7 +337,7 @@ func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) { return } -func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { +func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord, conf *Config) error { arrLen, err := dc.ReadArrayHeader() if err != nil { return msgp.WrapError(err) @@ -322,12 +351,13 @@ func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { return msgp.WrapError(err, "Time") } - return parseRecordToLogRecord(dc, lr) + return parseRecordToLogRecord(dc, lr, conf) } type PackedForwardEventLogRecords struct { pdata.LogSlice OptionsMap + conf *Config } func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice { @@ -336,8 +366,9 @@ func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice { // DecodeMsg implements msgp.Decodable. This was originally code generated but // then manually copied here in order to handle the optional Options field. -func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) error { +func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader, conf *Config) error { pfe.LogSlice = pdata.NewLogSlice() + pfe.conf = conf arrLen, err := dc.ReadArrayHeader() if err != nil { @@ -412,7 +443,7 @@ func (pfe *PackedForwardEventLogRecords) parseEntries(entriesRaw []byte, isGzipp msgpReader := msgp.NewReader(reader) for { lr := pdata.NewLogRecord() - err := parseEntryToLogRecord(msgpReader, lr) + err := parseEntryToLogRecord(msgpReader, lr, pfe.conf) if err != nil { if msgp.Cause(err) == io.EOF { return nil diff --git a/receiver/fluentforwardreceiver/conversion_test.go b/receiver/fluentforwardreceiver/conversion_test.go index 2f49f03c7b4a..fe4d42db7b82 100644 --- a/receiver/fluentforwardreceiver/conversion_test.go +++ b/receiver/fluentforwardreceiver/conversion_test.go @@ -27,9 +27,13 @@ import ( func TestMessageEventConversion(t *testing.T) { eventBytes := parseHexDump("testdata/message-event") reader := msgp.NewReader(bytes.NewReader(eventBytes)) + config := &Config{ + ListenAddress: "127.0.0.1:0", + Mappings: MappingWhitelist{Severity: []string{"severity"}}, + } var event MessageEventLogRecord - err := event.DecodeMsg(reader) + err := event.DecodeMsg(reader, config) require.Nil(t, err) le := event.LogRecords().At(0) @@ -93,9 +97,13 @@ func TestAttributeTypeConversion(t *testing.T) { require.NoError(t, err) reader := msgp.NewReader(bytes.NewReader(b)) + config := &Config{ + ListenAddress: "127.0.0.1:0", + Mappings: MappingWhitelist{Severity: []string{"severity"}}, + } var event MessageEventLogRecord - err = event.DecodeMsg(reader) + err = event.DecodeMsg(reader, config) require.Nil(t, err) le := event.LogRecords().At(0) @@ -150,13 +158,17 @@ func TestMessageEventConversionWithErrors(t *testing.T) { b = msgp.AppendMapHeader(b, 1) b = msgp.AppendString(b, "a") b = msgp.AppendFloat64(b, 5.0) + config := &Config{ + ListenAddress: "127.0.0.1:0", + Mappings: MappingWhitelist{Severity: []string{"severity"}}, + } for i := 0; i < len(b)-1; i++ { t.Run(fmt.Sprintf("EOF at byte %d", i), func(t *testing.T) { reader := msgp.NewReader(bytes.NewReader(b[:i])) var event MessageEventLogRecord - err := event.DecodeMsg(reader) + err := event.DecodeMsg(reader, config) require.NotNil(t, err) }) } @@ -168,20 +180,24 @@ func TestMessageEventConversionWithErrors(t *testing.T) { reader := msgp.NewReader(bytes.NewReader(in)) var event MessageEventLogRecord - err := event.DecodeMsg(reader) + err := event.DecodeMsg(reader, config) require.NotNil(t, err) }) } func TestForwardEventConversionWithErrors(t *testing.T) { b := parseHexDump("testdata/forward-event") + config := &Config{ + ListenAddress: "127.0.0.1:0", + Mappings: MappingWhitelist{Severity: []string{"severity"}}, + } for i := 0; i < len(b)-1; i++ { t.Run(fmt.Sprintf("EOF at byte %d", i), func(t *testing.T) { reader := msgp.NewReader(bytes.NewReader(b[:i])) var event ForwardEventLogRecords - err := event.DecodeMsg(reader) + err := event.DecodeMsg(reader, config) require.NotNil(t, err) }) } @@ -189,13 +205,17 @@ func TestForwardEventConversionWithErrors(t *testing.T) { func TestPackedForwardEventConversionWithErrors(t *testing.T) { b := parseHexDump("testdata/forward-packed-compressed") + config := &Config{ + ListenAddress: "127.0.0.1:0", + Mappings: MappingWhitelist{Severity: []string{"severity"}}, + } for i := 0; i < len(b)-1; i++ { t.Run(fmt.Sprintf("EOF at byte %d", i), func(t *testing.T) { reader := msgp.NewReader(bytes.NewReader(b[:i])) var event PackedForwardEventLogRecords - err := event.DecodeMsg(reader) + err := event.DecodeMsg(reader, config) require.NotNil(t, err) }) } @@ -207,7 +227,7 @@ func TestPackedForwardEventConversionWithErrors(t *testing.T) { reader := msgp.NewReader(bytes.NewReader(in)) var event PackedForwardEventLogRecords - err := event.DecodeMsg(reader) + err := event.DecodeMsg(reader, config) require.NotNil(t, err) require.Contains(t, err.Error(), "gzip") print(err.Error()) diff --git a/receiver/fluentforwardreceiver/receiver.go b/receiver/fluentforwardreceiver/receiver.go index 2bae4125d996..0ddf1a07f53e 100644 --- a/receiver/fluentforwardreceiver/receiver.go +++ b/receiver/fluentforwardreceiver/receiver.go @@ -39,10 +39,11 @@ type fluentReceiver struct { func newFluentReceiver(logger *zap.Logger, conf *Config, next consumer.Logs) (component.LogsReceiver, error) { eventCh := make(chan Event, eventChannelLength) + conf.validate() collector := newCollector(eventCh, next, logger) - server := newServer(eventCh, logger) + server := newServer(eventCh, logger, conf) return &fluentReceiver{ collector: collector, diff --git a/receiver/fluentforwardreceiver/server.go b/receiver/fluentforwardreceiver/server.go index 5eb5dea580fb..9bd3c3448016 100644 --- a/receiver/fluentforwardreceiver/server.go +++ b/receiver/fluentforwardreceiver/server.go @@ -37,12 +37,14 @@ const readBufferSize = 10 * 1024 type server struct { outCh chan<- Event logger *zap.Logger + conf *Config } -func newServer(outCh chan<- Event, logger *zap.Logger) *server { +func newServer(outCh chan<- Event, logger *zap.Logger, conf *Config) *server { return &server{ outCh: outCh, logger: logger, + conf: conf, } } @@ -111,7 +113,7 @@ func (s *server) handleConn(ctx context.Context, conn net.Conn) error { panic("programmer bug in mode handling") } - err = event.DecodeMsg(reader) + err = event.DecodeMsg(reader, s.conf) if err != nil { if err != io.EOF { stats.Record(ctx, observ.FailedToParse.M(1))