Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update docs for built-in processors, improve filterfield processor #992

Merged
merged 8 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions pkg/processor/procbuiltin/extractfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ func init() {
}

// ExtractFieldKey builds the following processor:
// - If the key is raw and has a schema attached, extract the field and use it
// to replace the entire key.
// - If the key is raw and has no schema, return an error (not supported).
// - If the key is raw, return an error (not supported yet).
// - If the key is structured, extract the field and use it to replace the
// entire key.
func ExtractFieldKey(config processor.Config) (processor.Interface, error) {
Expand Down
16 changes: 12 additions & 4 deletions pkg/processor/procbuiltin/filterfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,20 @@ func filterField(
if err != nil {
return record.Record{}, cerrors.Errorf("filterfield failed to parse path: %w", err)
}
match := jsonquery.FindOne(doc, filtercondition)
if match == nil {
matches, err := jsonquery.Query(doc, filtercondition)
if err != nil {
return record.Record{}, cerrors.Errorf("invalid XPath expression in 'condition': %w", err)
}

if matches == nil {
// check the filterexists query if one is set.
if filterexists != "" {
exists := jsonquery.Find(doc, filterexists)
exists, err := jsonquery.QueryAll(doc, filterexists)
if err != nil {
return record.Record{}, cerrors.Errorf("invalid XPath expression in 'exists': %w", err)
}
// if it matches, handle normal drop record behavior.
if exists == nil {
if len(exists) == 0 {
// if it doesn't match, defer to filternull behavior
switch filternull {
case "include":
Expand All @@ -140,6 +147,7 @@ func filterField(
return record.Record{}, processor.ErrSkipRecord
}

// filtercondition passed
// handle matches based on filtertype as normal
switch filtertype {
case "include":
Expand Down
63 changes: 49 additions & 14 deletions pkg/processor/procbuiltin/filterfield_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ func TestFilterFieldKey_Process(t *testing.T) {
wantErr bool
err error
}{
{
name: "should return error on invalid condition",
config: processor.Config{
Settings: map[string]string{
"type": "include",
"condition": "////",
"fail": "include",
},
},
args: args{r: record.Record{
Key: record.StructuredData{
"id": "foo",
},
}},
want: record.Record{},
wantErr: true,
},
{
name: "should forward record on condition",
config: processor.Config{
Expand Down Expand Up @@ -304,12 +321,33 @@ func TestFilterFieldPayload_Process(t *testing.T) {
r record.Record
}
tests := []struct {
name string
args args
want record.Record
wantErr bool
err error
name string
args args
want record.Record
err error
}{
{
name: "should return error on invalid condition",
args: args{
r: record.Record{
Payload: record.Change{
Before: nil,
After: record.StructuredData{
"foo": "bar",
},
},
},
config: processor.Config{
Settings: map[string]string{
"type": "include",
"condition": "////",
"missingornull": "fail",
},
},
},
want: record.Record{},
err: cerrors.New("invalid XPath expression in 'condition': expression must evaluate to a node-set"),
},
{
name: "should forward record on condition",
args: args{
Expand All @@ -336,7 +374,6 @@ func TestFilterFieldPayload_Process(t *testing.T) {
},
},
},
wantErr: false,
},
{
name: "should drop record on condition",
Expand All @@ -355,9 +392,8 @@ func TestFilterFieldPayload_Process(t *testing.T) {
"condition": "foo > 1",
},
}},
want: record.Record{},
wantErr: true,
err: processor.ErrSkipRecord,
want: record.Record{},
err: processor.ErrSkipRecord,
},
{
name: "should drop record on missing key",
Expand All @@ -378,21 +414,20 @@ func TestFilterFieldPayload_Process(t *testing.T) {
"missingornull": "exclude",
},
}},
want: record.Record{},
wantErr: true,
err: processor.ErrSkipRecord,
want: record.Record{},
err: processor.ErrSkipRecord,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
underTest, err := FilterFieldPayload(tt.args.config)
assert.Ok(t, err)
got, err := underTest.Process(context.Background(), tt.args.r)
if (err != nil) != tt.wantErr {
if (err != nil) != (tt.err != nil) {
t.Errorf("FilterFieldPayload Error: %s - wanted: %s", err, tt.err)
return
}
if tt.wantErr {
if tt.err != nil {
if diff := cmp.Diff(tt.err.Error(), err.Error()); diff != "" {
t.Errorf("FilterFieldPayload() failed: [DIFF] %s", diff)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/processor/procbuiltin/hoistfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func init() {
}

// HoistFieldKey builds the following processor:
// - If the key is raw and has a schema attached, wrap it using the specified
// field name in a struct.
// - If the key is raw and has a schema attached, return an error (not supported yet).
// - If the key is raw and has no schema, transforms it into structured data by
// creating a map with the hoisted field and raw data as the value.
// - If the key is structured, wrap it using the specified field name in a map.
Expand Down
4 changes: 1 addition & 3 deletions pkg/processor/procbuiltin/insertfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func init() {
}

// InsertFieldKey builds the following processor:
// - If the key is raw and has a schema attached, insert the field(s) in the
// key data.
// - If the key is raw and has no schema, return an error (not supported).
// - If the key is raw, return an error (not supported yet).
// - If the key is structured, set the field(s) in the key data.
func InsertFieldKey(config processor.Config) (processor.Interface, error) {
return insertField(insertFieldKeyProcType, recordKeyGetSetter{}, config)
Expand Down
4 changes: 1 addition & 3 deletions pkg/processor/procbuiltin/maskfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ func init() {
}

// MaskFieldKey builds the following processor:
// - If the key is raw and has a schema attached, replace the field with the
// zero value of the fields type.
// - If the key is raw and has no schema, return an error (not supported).
// - If the key is raw, return an error (not supported yet).
// - If the key is structured, replace the field with the zero value of the
// fields type.
func MaskFieldKey(config processor.Config) (processor.Interface, error) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/processor/procbuiltin/replacefield.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ func init() {
processor.GlobalBuilderRegistry.MustRegister(replaceFieldPayloadProcType, ReplaceFieldKey)
}

// ReplaceFieldKey builds a processor which replaces a field in the key in raw
// data with a schema or in structured data. Raw data without a schema is not
// supported. The processor can be controlled by 3 variables:
// ReplaceFieldKey builds a processor which replaces a field in a structured key.
// Raw data is not supported. The processor can be controlled by 3 variables:
// - "exclude" - is a comma separated list of fields that should be excluded
// from the processed record ("exclude" takes precedence over "include").
// - "include" - is a comma separated list of fields that should be included
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,41 @@ import (
)

const (
timestampConvertorKeyProcType = "timestampconvertorkey"
timestampConvertorPayloadProcType = "timestampconvertorpayload"
timestampConverterKeyProcType = "timestampconverterkey"
timestampConverterPayloadProcType = "timestampconverterpayload"

timestampConvertorConfigTargetType = "target.type"
timestampConvertorConfigField = "date"
timestampConvertorConfigFormat = "format"
timestampConverterConfigTargetType = "target.type"
timestampConverterConfigField = "date"
timestampConverterConfigFormat = "format"
)

func init() {
processor.GlobalBuilderRegistry.MustRegister(timestampConvertorKeyProcType, TimestampConvertorKey)
processor.GlobalBuilderRegistry.MustRegister(timestampConvertorPayloadProcType, TimestampConvertorPayload)
processor.GlobalBuilderRegistry.MustRegister(timestampConverterKeyProcType, TimestampConverterKey)
processor.GlobalBuilderRegistry.MustRegister(timestampConverterPayloadProcType, TimestampConverterPayload)
}

// TimestampConvertorKey todo
func TimestampConvertorKey(config processor.Config) (processor.Interface, error) {
return timestampConvertor(timestampConvertorKeyProcType, recordKeyGetSetter{}, config)
// TimestampConverterKey builds a processor which converts a timestamp in a field in the key
// into a different type. The supported types are:
// - "string"
// - "unix"
// - "time.Time".
//
// Any combination of the supported types is possible. For example, it's possible
// to convert from a Unix timestamp to Go's time.Time or to convert from a string
// to a Unix timestamp.
//
// The processor supports only structured data.
func TimestampConverterKey(config processor.Config) (processor.Interface, error) {
return timestampConverter(timestampConverterKeyProcType, recordKeyGetSetter{}, config)
}

// TimestampConvertorPayload todo
func TimestampConvertorPayload(config processor.Config) (processor.Interface, error) {
return timestampConvertor(timestampConvertorPayloadProcType, recordPayloadGetSetter{}, config)
// TimestampConverterPayload builds the same processor as TimestampConverterKey, except that
// it operates on the field Record.Payload.After.
func TimestampConverterPayload(config processor.Config) (processor.Interface, error) {
return timestampConverter(timestampConverterPayloadProcType, recordPayloadGetSetter{}, config)
}

func timestampConvertor(
func timestampConverter(
processorType string,
getSetter recordDataGetSetter,
config processor.Config,
Expand All @@ -66,16 +77,16 @@ func timestampConvertor(
)

// if field is empty then input is raw data
if field, err = getConfigFieldString(config, timestampConvertorConfigField); err != nil {
if field, err = getConfigFieldString(config, timestampConverterConfigField); err != nil {
return nil, cerrors.Errorf("%s: %w", processorType, err)
}
if targetType, err = getConfigFieldString(config, timestampConvertorConfigTargetType); err != nil {
if targetType, err = getConfigFieldString(config, timestampConverterConfigTargetType); err != nil {
return nil, cerrors.Errorf("%s: %w", processorType, err)
}
if targetType != stringType && targetType != unixType && targetType != timeType {
return nil, cerrors.Errorf("%s: targetType (%s) is not supported", processorType, targetType)
}
format = config.Settings[timestampConvertorConfigFormat] // can be empty
format = config.Settings[timestampConverterConfigFormat] // can be empty
if format == "" && targetType == stringType {
return nil, cerrors.Errorf("%s: format is needed to parse the output", processorType)
}
Expand Down
Loading