-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to configure mappings from fluent attributes to otel fields #5173
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,11 @@ | |
|
||
package fluentforwardreceiver | ||
|
||
import "go.opentelemetry.io/collector/config" | ||
import ( | ||
"errors" | ||
|
||
"go.opentelemetry.io/collector/config" | ||
) | ||
|
||
// Config defines configuration for the SignalFx receiver. | ||
type Config struct { | ||
|
@@ -24,4 +28,32 @@ type Config struct { | |
// of the form `<ip addr>:<port>` (TCP) or `unix://<socket_path>` (Unix | ||
// domain socket). | ||
ListenAddress string `mapstructure:"endpoint"` | ||
|
||
Mappings MappingWhitelist `mapstructure:"mappings"` | ||
|
||
BodyAsString bool `mapstructure:"body_as_string"` | ||
} | ||
|
||
func (c *Config) validate() error { | ||
if c.ListenAddress == "" { | ||
return errors.New("`endpoint` not specified") | ||
} | ||
|
||
/*if c.BodyAsString == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How to test if the config is defined or not to have a default value ? |
||
c.BodyAsString = true | ||
}*/ | ||
|
||
if len(c.Mappings.Body) == 0 { | ||
c.Mappings.Body = []string{"log", "message"} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Default value for retrocompatibility. |
||
} | ||
return nil | ||
} | ||
|
||
// MappingWhitelist defines the fluent attributes mapping configuration | ||
type MappingWhitelist struct { | ||
// Body is the list of fluent attributes ok to become the OTEL body | ||
Body []string `mapstructure:"body"` | ||
|
||
// Severity is the list of fluent attributes ok to become the OTEL SeverityText | ||
Severity []string `mapstructure:"severity"` | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ const tagAttributeKey = "fluent.tag" | |
// which describes the fields in much greater detail. | ||
|
||
type Event interface { | ||
DecodeMsg(dc *msgp.Reader) error | ||
DecodeMsg(dc *msgp.Reader, conf *Config) error | ||
LogRecords() pdata.LogSlice | ||
Chunk() string | ||
Compressed() string | ||
|
@@ -135,8 +135,10 @@ func decodeTimestampToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { | |
return nil | ||
} | ||
|
||
func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { | ||
func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord, conf *Config) error { | ||
attrs := lr.Attributes() | ||
bodyMap := lr.Body().MapVal() | ||
lr.SetName("fluent[d|bit]") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Arbitrary name to provide something in name (I think otel collector should at least show the good exemple) |
||
|
||
recordLen, err := dc.ReadMapHeader() | ||
if err != nil { | ||
|
@@ -160,25 +162,53 @@ func parseRecordToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { | |
return msgp.WrapError(err, "Record", key) | ||
} | ||
|
||
// fluentd uses message, fluentbit log. | ||
if key == "message" || key == "log" { | ||
switch v := val.(type) { | ||
case string: | ||
lr.Body().SetStringVal(v) | ||
case []uint8: | ||
// Sometimes strings come in as uint8's. | ||
lr.Body().SetStringVal(string(v)) | ||
default: | ||
return fmt.Errorf("cannot convert message type %T to string", val) | ||
} | ||
s, err := ValueToString(val) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To move into |
||
if err != nil { | ||
return err | ||
} | ||
|
||
if Contains(conf.Mappings.Body, key) { | ||
insertToAttributeMap(key, val, &bodyMap) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By simplicity, I try to do the best to convert a maximum of fields in the body, se I work on body attributemap instead of string value. |
||
} else if Contains(conf.Mappings.Severity, key) { | ||
lr.SetSeverityText(s) | ||
} else { | ||
insertToAttributeMap(key, val, &attrs) | ||
} | ||
} | ||
if conf.BodyAsString { | ||
lr.Body().SetStringVal(lr.Body().AsString()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the user prefers a string body, we convert it here. This will change a little the actual behavior, for a basic use case where you convert the Any concern about that or the encoding choice ? |
||
} | ||
|
||
return nil | ||
} | ||
|
||
func ValueToString(val interface{}) (string, error) { | ||
switch v := val.(type) { | ||
case string: | ||
return v, nil | ||
case map[string]interface{}, []interface{}: | ||
encoded, err := json.Marshal(v) | ||
if err != nil { | ||
return "", err | ||
} | ||
return string(encoded), nil | ||
case []byte: | ||
// Sometimes strings come in as bytes array. | ||
return string(v), nil | ||
default: | ||
return "", fmt.Errorf("cannot convert message type %T to string", val) | ||
} | ||
} | ||
|
||
func Contains(slice []string, element string) bool { | ||
for _, val := range slice { | ||
if val == element { | ||
return true | ||
} | ||
} | ||
return false | ||
} | ||
|
||
type MessageEventLogRecord struct { | ||
pdata.LogSlice | ||
OptionsMap | ||
|
@@ -188,7 +218,7 @@ func (melr *MessageEventLogRecord) LogRecords() pdata.LogSlice { | |
return melr.LogSlice | ||
} | ||
|
||
func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error { | ||
func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader, conf *Config) error { | ||
melr.LogSlice = pdata.NewLogSlice() | ||
log := melr.LogSlice.AppendEmpty() | ||
|
||
|
@@ -216,7 +246,7 @@ func (melr *MessageEventLogRecord) DecodeMsg(dc *msgp.Reader) error { | |
return msgp.WrapError(err, "Time") | ||
} | ||
|
||
err = parseRecordToLogRecord(dc, log) | ||
err = parseRecordToLogRecord(dc, log, conf) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -262,7 +292,7 @@ func (fe *ForwardEventLogRecords) LogRecords() pdata.LogSlice { | |
return fe.LogSlice | ||
} | ||
|
||
func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) { | ||
func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader, conf *Config) (err error) { | ||
fe.LogSlice = pdata.NewLogSlice() | ||
|
||
var arrLen uint32 | ||
|
@@ -291,7 +321,7 @@ func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) { | |
for i := 0; i < int(entryLen); i++ { | ||
lr := fe.LogSlice.AppendEmpty() | ||
|
||
err = parseEntryToLogRecord(dc, lr) | ||
err = parseEntryToLogRecord(dc, lr, conf) | ||
if err != nil { | ||
return msgp.WrapError(err, "Entries", i) | ||
} | ||
|
@@ -308,7 +338,7 @@ func (fe *ForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) (err error) { | |
return | ||
} | ||
|
||
func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { | ||
func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord, conf *Config) error { | ||
arrLen, err := dc.ReadArrayHeader() | ||
if err != nil { | ||
return msgp.WrapError(err) | ||
|
@@ -322,12 +352,13 @@ func parseEntryToLogRecord(dc *msgp.Reader, lr pdata.LogRecord) error { | |
return msgp.WrapError(err, "Time") | ||
} | ||
|
||
return parseRecordToLogRecord(dc, lr) | ||
return parseRecordToLogRecord(dc, lr, conf) | ||
} | ||
|
||
type PackedForwardEventLogRecords struct { | ||
pdata.LogSlice | ||
OptionsMap | ||
conf *Config | ||
} | ||
|
||
func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice { | ||
|
@@ -336,8 +367,9 @@ func (pfe *PackedForwardEventLogRecords) LogRecords() pdata.LogSlice { | |
|
||
// DecodeMsg implements msgp.Decodable. This was originally code generated but | ||
// then manually copied here in order to handle the optional Options field. | ||
func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader) error { | ||
func (pfe *PackedForwardEventLogRecords) DecodeMsg(dc *msgp.Reader, conf *Config) error { | ||
pfe.LogSlice = pdata.NewLogSlice() | ||
pfe.conf = conf | ||
|
||
arrLen, err := dc.ReadArrayHeader() | ||
if err != nil { | ||
|
@@ -412,7 +444,7 @@ func (pfe *PackedForwardEventLogRecords) parseEntries(entriesRaw []byte, isGzipp | |
msgpReader := msgp.NewReader(reader) | ||
for { | ||
lr := pdata.NewLogRecord() | ||
err := parseEntryToLogRecord(msgpReader, lr) | ||
err := parseEntryToLogRecord(msgpReader, lr, pfe.conf) | ||
if err != nil { | ||
if msgp.Cause(err) == io.EOF { | ||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If anyone has a better idea / strategy about these mapping I take it. It makes the job but I have the feeling that it's not ideal.
But what I think is the concept could be reused on other receivers where underlying technology doesn't fit with all OTEL concepts.