diff --git a/pkg/processor/procbuiltin/extractfield.go b/pkg/processor/procbuiltin/extractfield.go index 72c0bcc42..55a53d14b 100644 --- a/pkg/processor/procbuiltin/extractfield.go +++ b/pkg/processor/procbuiltin/extractfield.go @@ -36,9 +36,7 @@ func init() { } // ExtractFieldKey builds the following processor: -// - If the key is raw and has a schema attached, extract the field and use it -// to replace the entire key. -// - If the key is raw and has no schema, return an error (not supported). +// - If the key is raw, return an error (not supported yet). // - If the key is structured, extract the field and use it to replace the // entire key. func ExtractFieldKey(config processor.Config) (processor.Interface, error) { diff --git a/pkg/processor/procbuiltin/filterfield.go b/pkg/processor/procbuiltin/filterfield.go index 3d05ae94d..9f4d0a8e6 100644 --- a/pkg/processor/procbuiltin/filterfield.go +++ b/pkg/processor/procbuiltin/filterfield.go @@ -118,13 +118,20 @@ func filterField( if err != nil { return record.Record{}, cerrors.Errorf("filterfield failed to parse path: %w", err) } - match := jsonquery.FindOne(doc, filtercondition) - if match == nil { + matches, err := jsonquery.Query(doc, filtercondition) + if err != nil { + return record.Record{}, cerrors.Errorf("invalid XPath expression in 'condition': %w", err) + } + + if matches == nil { // check the filterexists query if one is set. if filterexists != "" { - exists := jsonquery.Find(doc, filterexists) + exists, err := jsonquery.QueryAll(doc, filterexists) + if err != nil { + return record.Record{}, cerrors.Errorf("invalid XPath expression in 'exists': %w", err) + } // if it matches, handle normal drop record behavior. - if exists == nil { + if len(exists) == 0 { // if it doesn't match, defer to filternull behavior switch filternull { case "include": @@ -140,6 +147,7 @@ func filterField( return record.Record{}, processor.ErrSkipRecord } + // filtercondition passed // handle matches based on filtertype as normal switch filtertype { case "include": diff --git a/pkg/processor/procbuiltin/filterfield_test.go b/pkg/processor/procbuiltin/filterfield_test.go index d9ca52aa7..687337795 100644 --- a/pkg/processor/procbuiltin/filterfield_test.go +++ b/pkg/processor/procbuiltin/filterfield_test.go @@ -112,6 +112,23 @@ func TestFilterFieldKey_Process(t *testing.T) { wantErr bool err error }{ + { + name: "should return error on invalid condition", + config: processor.Config{ + Settings: map[string]string{ + "type": "include", + "condition": "////", + "fail": "include", + }, + }, + args: args{r: record.Record{ + Key: record.StructuredData{ + "id": "foo", + }, + }}, + want: record.Record{}, + wantErr: true, + }, { name: "should forward record on condition", config: processor.Config{ @@ -304,12 +321,33 @@ func TestFilterFieldPayload_Process(t *testing.T) { r record.Record } tests := []struct { - name string - args args - want record.Record - wantErr bool - err error + name string + args args + want record.Record + err error }{ + { + name: "should return error on invalid condition", + args: args{ + r: record.Record{ + Payload: record.Change{ + Before: nil, + After: record.StructuredData{ + "foo": "bar", + }, + }, + }, + config: processor.Config{ + Settings: map[string]string{ + "type": "include", + "condition": "////", + "missingornull": "fail", + }, + }, + }, + want: record.Record{}, + err: cerrors.New("invalid XPath expression in 'condition': expression must evaluate to a node-set"), + }, { name: "should forward record on condition", args: args{ @@ -336,7 +374,6 @@ func TestFilterFieldPayload_Process(t *testing.T) { }, }, }, - wantErr: false, }, { name: "should drop record on condition", @@ -355,9 +392,8 @@ func TestFilterFieldPayload_Process(t *testing.T) { "condition": "foo > 1", }, }}, - want: record.Record{}, - wantErr: true, - err: processor.ErrSkipRecord, + want: record.Record{}, + err: processor.ErrSkipRecord, }, { name: "should drop record on missing key", @@ -378,9 +414,8 @@ func TestFilterFieldPayload_Process(t *testing.T) { "missingornull": "exclude", }, }}, - want: record.Record{}, - wantErr: true, - err: processor.ErrSkipRecord, + want: record.Record{}, + err: processor.ErrSkipRecord, }, } for _, tt := range tests { @@ -388,11 +423,11 @@ func TestFilterFieldPayload_Process(t *testing.T) { underTest, err := FilterFieldPayload(tt.args.config) assert.Ok(t, err) got, err := underTest.Process(context.Background(), tt.args.r) - if (err != nil) != tt.wantErr { + if (err != nil) != (tt.err != nil) { t.Errorf("FilterFieldPayload Error: %s - wanted: %s", err, tt.err) return } - if tt.wantErr { + if tt.err != nil { if diff := cmp.Diff(tt.err.Error(), err.Error()); diff != "" { t.Errorf("FilterFieldPayload() failed: [DIFF] %s", diff) } diff --git a/pkg/processor/procbuiltin/hoistfield.go b/pkg/processor/procbuiltin/hoistfield.go index 4ea4574a8..5f8ec9900 100644 --- a/pkg/processor/procbuiltin/hoistfield.go +++ b/pkg/processor/procbuiltin/hoistfield.go @@ -35,8 +35,7 @@ func init() { } // HoistFieldKey builds the following processor: -// - If the key is raw and has a schema attached, wrap it using the specified -// field name in a struct. +// - If the key is raw and has a schema attached, return an error (not supported yet). // - If the key is raw and has no schema, transforms it into structured data by // creating a map with the hoisted field and raw data as the value. // - If the key is structured, wrap it using the specified field name in a map. diff --git a/pkg/processor/procbuiltin/insertfield.go b/pkg/processor/procbuiltin/insertfield.go index bd07e55aa..bb8f21584 100644 --- a/pkg/processor/procbuiltin/insertfield.go +++ b/pkg/processor/procbuiltin/insertfield.go @@ -37,9 +37,7 @@ func init() { } // InsertFieldKey builds the following processor: -// - If the key is raw and has a schema attached, insert the field(s) in the -// key data. -// - If the key is raw and has no schema, return an error (not supported). +// - If the key is raw, return an error (not supported yet). // - If the key is structured, set the field(s) in the key data. func InsertFieldKey(config processor.Config) (processor.Interface, error) { return insertField(insertFieldKeyProcType, recordKeyGetSetter{}, config) diff --git a/pkg/processor/procbuiltin/maskfield.go b/pkg/processor/procbuiltin/maskfield.go index 48e443af8..535d92320 100644 --- a/pkg/processor/procbuiltin/maskfield.go +++ b/pkg/processor/procbuiltin/maskfield.go @@ -38,9 +38,7 @@ func init() { } // MaskFieldKey builds the following processor: -// - If the key is raw and has a schema attached, replace the field with the -// zero value of the fields type. -// - If the key is raw and has no schema, return an error (not supported). +// - If the key is raw, return an error (not supported yet). // - If the key is structured, replace the field with the zero value of the // fields type. func MaskFieldKey(config processor.Config) (processor.Interface, error) { diff --git a/pkg/processor/procbuiltin/replacefield.go b/pkg/processor/procbuiltin/replacefield.go index 0aca61e18..6ead1fc8b 100644 --- a/pkg/processor/procbuiltin/replacefield.go +++ b/pkg/processor/procbuiltin/replacefield.go @@ -37,9 +37,8 @@ func init() { processor.GlobalBuilderRegistry.MustRegister(replaceFieldPayloadProcType, ReplaceFieldKey) } -// ReplaceFieldKey builds a processor which replaces a field in the key in raw -// data with a schema or in structured data. Raw data without a schema is not -// supported. The processor can be controlled by 3 variables: +// ReplaceFieldKey builds a processor which replaces a field in a structured key. +// Raw data is not supported. The processor can be controlled by 3 variables: // - "exclude" - is a comma separated list of fields that should be excluded // from the processed record ("exclude" takes precedence over "include"). // - "include" - is a comma separated list of fields that should be included diff --git a/pkg/processor/procbuiltin/timestampconvertor.go b/pkg/processor/procbuiltin/timestampconverter.go similarity index 68% rename from pkg/processor/procbuiltin/timestampconvertor.go rename to pkg/processor/procbuiltin/timestampconverter.go index 3a5b17090..3d94c710b 100644 --- a/pkg/processor/procbuiltin/timestampconvertor.go +++ b/pkg/processor/procbuiltin/timestampconverter.go @@ -24,30 +24,41 @@ import ( ) const ( - timestampConvertorKeyProcType = "timestampconvertorkey" - timestampConvertorPayloadProcType = "timestampconvertorpayload" + timestampConverterKeyProcType = "timestampconverterkey" + timestampConverterPayloadProcType = "timestampconverterpayload" - timestampConvertorConfigTargetType = "target.type" - timestampConvertorConfigField = "date" - timestampConvertorConfigFormat = "format" + timestampConverterConfigTargetType = "target.type" + timestampConverterConfigField = "date" + timestampConverterConfigFormat = "format" ) func init() { - processor.GlobalBuilderRegistry.MustRegister(timestampConvertorKeyProcType, TimestampConvertorKey) - processor.GlobalBuilderRegistry.MustRegister(timestampConvertorPayloadProcType, TimestampConvertorPayload) + processor.GlobalBuilderRegistry.MustRegister(timestampConverterKeyProcType, TimestampConverterKey) + processor.GlobalBuilderRegistry.MustRegister(timestampConverterPayloadProcType, TimestampConverterPayload) } -// TimestampConvertorKey todo -func TimestampConvertorKey(config processor.Config) (processor.Interface, error) { - return timestampConvertor(timestampConvertorKeyProcType, recordKeyGetSetter{}, config) +// TimestampConverterKey builds a processor which converts a timestamp in a field in the key +// into a different type. The supported types are: +// - "string" +// - "unix" +// - "time.Time". +// +// Any combination of the supported types is possible. For example, it's possible +// to convert from a Unix timestamp to Go's time.Time or to convert from a string +// to a Unix timestamp. +// +// The processor supports only structured data. +func TimestampConverterKey(config processor.Config) (processor.Interface, error) { + return timestampConverter(timestampConverterKeyProcType, recordKeyGetSetter{}, config) } -// TimestampConvertorPayload todo -func TimestampConvertorPayload(config processor.Config) (processor.Interface, error) { - return timestampConvertor(timestampConvertorPayloadProcType, recordPayloadGetSetter{}, config) +// TimestampConverterPayload builds the same processor as TimestampConverterKey, except that +// it operates on the field Record.Payload.After. +func TimestampConverterPayload(config processor.Config) (processor.Interface, error) { + return timestampConverter(timestampConverterPayloadProcType, recordPayloadGetSetter{}, config) } -func timestampConvertor( +func timestampConverter( processorType string, getSetter recordDataGetSetter, config processor.Config, @@ -66,16 +77,16 @@ func timestampConvertor( ) // if field is empty then input is raw data - if field, err = getConfigFieldString(config, timestampConvertorConfigField); err != nil { + if field, err = getConfigFieldString(config, timestampConverterConfigField); err != nil { return nil, cerrors.Errorf("%s: %w", processorType, err) } - if targetType, err = getConfigFieldString(config, timestampConvertorConfigTargetType); err != nil { + if targetType, err = getConfigFieldString(config, timestampConverterConfigTargetType); err != nil { return nil, cerrors.Errorf("%s: %w", processorType, err) } if targetType != stringType && targetType != unixType && targetType != timeType { return nil, cerrors.Errorf("%s: targetType (%s) is not supported", processorType, targetType) } - format = config.Settings[timestampConvertorConfigFormat] // can be empty + format = config.Settings[timestampConverterConfigFormat] // can be empty if format == "" && targetType == stringType { return nil, cerrors.Errorf("%s: format is needed to parse the output", processorType) } diff --git a/pkg/processor/procbuiltin/timestampconvertor_test.go b/pkg/processor/procbuiltin/timestampconverter_test.go similarity index 73% rename from pkg/processor/procbuiltin/timestampconvertor_test.go rename to pkg/processor/procbuiltin/timestampconverter_test.go index c45cf9b01..4e30318f3 100644 --- a/pkg/processor/procbuiltin/timestampconvertor_test.go +++ b/pkg/processor/procbuiltin/timestampconverter_test.go @@ -26,7 +26,7 @@ import ( "github.com/google/go-cmp/cmp" ) -func TestTimestampConvertorKey_Build(t *testing.T) { +func TestTimestampConverterKey_Build(t *testing.T) { type args struct { config processor.Config } @@ -47,23 +47,23 @@ func TestTimestampConvertorKey_Build(t *testing.T) { }, { name: "empty field returns error", args: args{config: processor.Config{ - Settings: map[string]string{timestampConvertorConfigField: ""}, + Settings: map[string]string{timestampConverterConfigField: ""}, }}, wantErr: true, }, { name: "empty format returns error when targetType is string", args: args{config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "foo", - timestampConvertorConfigTargetType: "string"}, + timestampConverterConfigField: "foo", + timestampConverterConfigTargetType: "string"}, }}, wantErr: true, }, { name: "unix target type doesn't require a format", args: args{config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "foo", - timestampConvertorConfigTargetType: "unix", + timestampConverterConfigField: "foo", + timestampConverterConfigTargetType: "unix", }, }}, wantErr: false, @@ -71,9 +71,9 @@ func TestTimestampConvertorKey_Build(t *testing.T) { name: "time.Time target type doesn't require a format, unless input type is string", args: args{config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "foo", - timestampConvertorConfigTargetType: "time.Time", - timestampConvertorConfigFormat: "2016-01-02", + timestampConverterConfigField: "foo", + timestampConverterConfigTargetType: "time.Time", + timestampConverterConfigFormat: "2016-01-02", }, }}, wantErr: false, @@ -81,9 +81,9 @@ func TestTimestampConvertorKey_Build(t *testing.T) { name: "string targetType needs a format", args: args{config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "foo", - timestampConvertorConfigTargetType: "string", - timestampConvertorConfigFormat: "2016-01-02", + timestampConverterConfigField: "foo", + timestampConverterConfigTargetType: "string", + timestampConverterConfigFormat: "2016-01-02", }, }}, wantErr: false, @@ -91,16 +91,16 @@ func TestTimestampConvertorKey_Build(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := TimestampConvertorKey(tt.args.config) + _, err := TimestampConverterKey(tt.args.config) if (err != nil) != tt.wantErr { - t.Errorf("TimestampConvertorKey() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("TimestampConverterKey() error = %v, wantErr %v", err, tt.wantErr) return } }) } } -func TestTimestampConvertorKey_Process(t *testing.T) { +func TestTimestampConverterKey_Process(t *testing.T) { type args struct { r record.Record } @@ -114,9 +114,9 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from unix to string", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "string", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "string", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -134,9 +134,9 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from time.Time to string", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "string", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "string", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -154,9 +154,9 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from time.Time to unix", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "unix", - timestampConvertorConfigFormat: "", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "unix", + timestampConverterConfigFormat: "", }, }, args: args{r: record.Record{ @@ -174,9 +174,9 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from string to unix", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "unix", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "unix", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -194,9 +194,10 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from string to time.Time", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "time.Time", - timestampConvertorConfigFormat: "2006-01-02"}, + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "time.Time", + timestampConverterConfigFormat: "2006-01-02", + }, }, args: args{r: record.Record{ Key: record.StructuredData{ @@ -213,9 +214,10 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from string to time.Time with empty format should throw error", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "time.Time", - timestampConvertorConfigFormat: ""}, + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "time.Time", + timestampConverterConfigFormat: "", + }, }, args: args{r: record.Record{ Key: record.StructuredData{ @@ -228,9 +230,10 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from string to unix with empty format should throw error", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "unix", - timestampConvertorConfigFormat: ""}, + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "unix", + timestampConverterConfigFormat: "", + }, }, args: args{r: record.Record{ Key: record.StructuredData{ @@ -243,9 +246,9 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "from unix to time.Time", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "time.Time", - timestampConvertorConfigFormat: "", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "time.Time", + timestampConverterConfigFormat: "", }, }, args: args{r: record.Record{ @@ -263,9 +266,9 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "raw data without schema", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "string", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "string", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -279,8 +282,8 @@ func TestTimestampConvertorKey_Process(t *testing.T) { name: "raw data with schema", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "foo", - timestampConvertorConfigTargetType: "unix", + timestampConverterConfigField: "foo", + timestampConverterConfigTargetType: "unix", }, }, args: args{r: record.Record{ @@ -294,7 +297,7 @@ func TestTimestampConvertorKey_Process(t *testing.T) { }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - underTest, err := TimestampConvertorKey(tt.config) + underTest, err := TimestampConverterKey(tt.config) assert.Ok(t, err) got, err := underTest.Process(context.Background(), tt.args.r) if (err != nil) != tt.wantErr { @@ -308,7 +311,7 @@ func TestTimestampConvertorKey_Process(t *testing.T) { } } -func TestTimestampConvertorPayload_Build(t *testing.T) { +func TestTimestampConverterPayload_Build(t *testing.T) { type args struct { config processor.Config } @@ -329,31 +332,31 @@ func TestTimestampConvertorPayload_Build(t *testing.T) { }, { name: "empty field returns error", args: args{config: processor.Config{ - Settings: map[string]string{timestampConvertorConfigField: ""}, + Settings: map[string]string{timestampConverterConfigField: ""}, }}, wantErr: true, }, { name: "empty format returns error when targetType is string", args: args{config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "foo", - timestampConvertorConfigTargetType: "string", + timestampConverterConfigField: "foo", + timestampConverterConfigTargetType: "string", }, }}, wantErr: true, }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - _, err := TimestampConvertorPayload(tt.args.config) + _, err := TimestampConverterPayload(tt.args.config) if (err != nil) != tt.wantErr { - t.Errorf("TimestampConvertorPayload() error = %v, wantErr %v", err, tt.wantErr) + t.Errorf("TimestampConverterPayload() error = %v, wantErr %v", err, tt.wantErr) return } }) } } -func TestTimestampConvertorPayload_Process(t *testing.T) { +func TestTimestampConverterPayload_Process(t *testing.T) { type args struct { r record.Record } @@ -367,9 +370,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from unix to string", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "string", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "string", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -393,9 +396,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from time.Time to string", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "string", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "string", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -419,9 +422,10 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from time.Time to unix", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "unix", - timestampConvertorConfigFormat: ""}, + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "unix", + timestampConverterConfigFormat: "", + }, }, args: args{r: record.Record{ Payload: record.Change{ @@ -444,9 +448,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from string to unix", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "unix", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "unix", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -470,9 +474,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from string to time.Time", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "time.Time", - timestampConvertorConfigFormat: "2006-01-02", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "time.Time", + timestampConverterConfigFormat: "2006-01-02", }, }, args: args{r: record.Record{ @@ -496,9 +500,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from string to time.Time with empty format should throw error", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "time.Time", - timestampConvertorConfigFormat: "", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "time.Time", + timestampConverterConfigFormat: "", }, }, args: args{r: record.Record{ @@ -515,9 +519,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from string to unix with empty format should throw error", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "unix", - timestampConvertorConfigFormat: "", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "unix", + timestampConverterConfigFormat: "", }, }, args: args{r: record.Record{ @@ -534,9 +538,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "from unix to time.Time", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "time.Time", - timestampConvertorConfigFormat: "", + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "time.Time", + timestampConverterConfigFormat: "", }, }, args: args{r: record.Record{ @@ -560,9 +564,9 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "raw data without schema", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "date", - timestampConvertorConfigTargetType: "string", - timestampConvertorConfigFormat: "2006-01-02"}, + timestampConverterConfigField: "date", + timestampConverterConfigTargetType: "string", + timestampConverterConfigFormat: "2006-01-02"}, }, args: args{r: record.Record{ Payload: record.Change{ @@ -578,8 +582,8 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { name: "raw data with schema", config: processor.Config{ Settings: map[string]string{ - timestampConvertorConfigField: "foo", - timestampConvertorConfigTargetType: "unix", + timestampConverterConfigField: "foo", + timestampConverterConfigTargetType: "unix", }, }, args: args{r: record.Record{ @@ -596,7 +600,7 @@ func TestTimestampConvertorPayload_Process(t *testing.T) { }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - underTest, err := TimestampConvertorPayload(tt.config) + underTest, err := TimestampConverterPayload(tt.config) assert.Ok(t, err) got, err := underTest.Process(context.Background(), tt.args.r) if (err != nil) != tt.wantErr { diff --git a/pkg/processor/procbuiltin/valuetokey.go b/pkg/processor/procbuiltin/valuetokey.go index 802856cc8..db2e0fdb7 100644 --- a/pkg/processor/procbuiltin/valuetokey.go +++ b/pkg/processor/procbuiltin/valuetokey.go @@ -34,11 +34,9 @@ func init() { // ValueToKey builds a processor that replaces the record key with a new key // formed from a subset of fields in the record value. -// - If Payload.After is raw and has a schema attached, the created key will -// also have a schema with a subset of fields. // - If Payload.After is structured, the created key will also be structured // with a subset of fields. -// - If payload.After is raw and has no schema, return an error. +// - If Payload.After is raw, return an error (not supported yet). func ValueToKey(config processor.Config) (processor.Interface, error) { if config.Settings[valueToKeyConfigFields] == "" { return nil, cerrors.Errorf("%s: unspecified field %q", valueToKeyProcType, valueToKeyConfigFields)