Skip to content

Commit

Permalink
Update docs for built-in processors, improve filterfield processor (#992
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hariso authored Apr 11, 2023
1 parent ede3570 commit 9541533
Show file tree
Hide file tree
Showing 10 changed files with 182 additions and 134 deletions.
4 changes: 1 addition & 3 deletions pkg/processor/procbuiltin/extractfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ func init() {
}

// ExtractFieldKey builds the following processor:
// - If the key is raw and has a schema attached, extract the field and use it
// to replace the entire key.
// - If the key is raw and has no schema, return an error (not supported).
// - If the key is raw, return an error (not supported yet).
// - If the key is structured, extract the field and use it to replace the
// entire key.
func ExtractFieldKey(config processor.Config) (processor.Interface, error) {
Expand Down
16 changes: 12 additions & 4 deletions pkg/processor/procbuiltin/filterfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,20 @@ func filterField(
if err != nil {
return record.Record{}, cerrors.Errorf("filterfield failed to parse path: %w", err)
}
match := jsonquery.FindOne(doc, filtercondition)
if match == nil {
matches, err := jsonquery.Query(doc, filtercondition)
if err != nil {
return record.Record{}, cerrors.Errorf("invalid XPath expression in 'condition': %w", err)
}

if matches == nil {
// check the filterexists query if one is set.
if filterexists != "" {
exists := jsonquery.Find(doc, filterexists)
exists, err := jsonquery.QueryAll(doc, filterexists)
if err != nil {
return record.Record{}, cerrors.Errorf("invalid XPath expression in 'exists': %w", err)
}
// if it matches, handle normal drop record behavior.
if exists == nil {
if len(exists) == 0 {
// if it doesn't match, defer to filternull behavior
switch filternull {
case "include":
Expand All @@ -140,6 +147,7 @@ func filterField(
return record.Record{}, processor.ErrSkipRecord
}

// filtercondition passed
// handle matches based on filtertype as normal
switch filtertype {
case "include":
Expand Down
63 changes: 49 additions & 14 deletions pkg/processor/procbuiltin/filterfield_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,23 @@ func TestFilterFieldKey_Process(t *testing.T) {
wantErr bool
err error
}{
{
name: "should return error on invalid condition",
config: processor.Config{
Settings: map[string]string{
"type": "include",
"condition": "////",
"fail": "include",
},
},
args: args{r: record.Record{
Key: record.StructuredData{
"id": "foo",
},
}},
want: record.Record{},
wantErr: true,
},
{
name: "should forward record on condition",
config: processor.Config{
Expand Down Expand Up @@ -304,12 +321,33 @@ func TestFilterFieldPayload_Process(t *testing.T) {
r record.Record
}
tests := []struct {
name string
args args
want record.Record
wantErr bool
err error
name string
args args
want record.Record
err error
}{
{
name: "should return error on invalid condition",
args: args{
r: record.Record{
Payload: record.Change{
Before: nil,
After: record.StructuredData{
"foo": "bar",
},
},
},
config: processor.Config{
Settings: map[string]string{
"type": "include",
"condition": "////",
"missingornull": "fail",
},
},
},
want: record.Record{},
err: cerrors.New("invalid XPath expression in 'condition': expression must evaluate to a node-set"),
},
{
name: "should forward record on condition",
args: args{
Expand All @@ -336,7 +374,6 @@ func TestFilterFieldPayload_Process(t *testing.T) {
},
},
},
wantErr: false,
},
{
name: "should drop record on condition",
Expand All @@ -355,9 +392,8 @@ func TestFilterFieldPayload_Process(t *testing.T) {
"condition": "foo > 1",
},
}},
want: record.Record{},
wantErr: true,
err: processor.ErrSkipRecord,
want: record.Record{},
err: processor.ErrSkipRecord,
},
{
name: "should drop record on missing key",
Expand All @@ -378,21 +414,20 @@ func TestFilterFieldPayload_Process(t *testing.T) {
"missingornull": "exclude",
},
}},
want: record.Record{},
wantErr: true,
err: processor.ErrSkipRecord,
want: record.Record{},
err: processor.ErrSkipRecord,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
underTest, err := FilterFieldPayload(tt.args.config)
assert.Ok(t, err)
got, err := underTest.Process(context.Background(), tt.args.r)
if (err != nil) != tt.wantErr {
if (err != nil) != (tt.err != nil) {
t.Errorf("FilterFieldPayload Error: %s - wanted: %s", err, tt.err)
return
}
if tt.wantErr {
if tt.err != nil {
if diff := cmp.Diff(tt.err.Error(), err.Error()); diff != "" {
t.Errorf("FilterFieldPayload() failed: [DIFF] %s", diff)
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/processor/procbuiltin/hoistfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ func init() {
}

// HoistFieldKey builds the following processor:
// - If the key is raw and has a schema attached, wrap it using the specified
// field name in a struct.
// - If the key is raw and has a schema attached, return an error (not supported yet).
// - If the key is raw and has no schema, transforms it into structured data by
// creating a map with the hoisted field and raw data as the value.
// - If the key is structured, wrap it using the specified field name in a map.
Expand Down
4 changes: 1 addition & 3 deletions pkg/processor/procbuiltin/insertfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ func init() {
}

// InsertFieldKey builds the following processor:
// - If the key is raw and has a schema attached, insert the field(s) in the
// key data.
// - If the key is raw and has no schema, return an error (not supported).
// - If the key is raw, return an error (not supported yet).
// - If the key is structured, set the field(s) in the key data.
func InsertFieldKey(config processor.Config) (processor.Interface, error) {
return insertField(insertFieldKeyProcType, recordKeyGetSetter{}, config)
Expand Down
4 changes: 1 addition & 3 deletions pkg/processor/procbuiltin/maskfield.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ func init() {
}

// MaskFieldKey builds the following processor:
// - If the key is raw and has a schema attached, replace the field with the
// zero value of the fields type.
// - If the key is raw and has no schema, return an error (not supported).
// - If the key is raw, return an error (not supported yet).
// - If the key is structured, replace the field with the zero value of the
// fields type.
func MaskFieldKey(config processor.Config) (processor.Interface, error) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/processor/procbuiltin/replacefield.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ func init() {
processor.GlobalBuilderRegistry.MustRegister(replaceFieldPayloadProcType, ReplaceFieldKey)
}

// ReplaceFieldKey builds a processor which replaces a field in the key in raw
// data with a schema or in structured data. Raw data without a schema is not
// supported. The processor can be controlled by 3 variables:
// ReplaceFieldKey builds a processor which replaces a field in a structured key.
// Raw data is not supported. The processor can be controlled by 3 variables:
// - "exclude" - is a comma separated list of fields that should be excluded
// from the processed record ("exclude" takes precedence over "include").
// - "include" - is a comma separated list of fields that should be included
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,41 @@ import (
)

const (
timestampConvertorKeyProcType = "timestampconvertorkey"
timestampConvertorPayloadProcType = "timestampconvertorpayload"
timestampConverterKeyProcType = "timestampconverterkey"
timestampConverterPayloadProcType = "timestampconverterpayload"

timestampConvertorConfigTargetType = "target.type"
timestampConvertorConfigField = "date"
timestampConvertorConfigFormat = "format"
timestampConverterConfigTargetType = "target.type"
timestampConverterConfigField = "date"
timestampConverterConfigFormat = "format"
)

func init() {
processor.GlobalBuilderRegistry.MustRegister(timestampConvertorKeyProcType, TimestampConvertorKey)
processor.GlobalBuilderRegistry.MustRegister(timestampConvertorPayloadProcType, TimestampConvertorPayload)
processor.GlobalBuilderRegistry.MustRegister(timestampConverterKeyProcType, TimestampConverterKey)
processor.GlobalBuilderRegistry.MustRegister(timestampConverterPayloadProcType, TimestampConverterPayload)
}

// TimestampConvertorKey todo
func TimestampConvertorKey(config processor.Config) (processor.Interface, error) {
return timestampConvertor(timestampConvertorKeyProcType, recordKeyGetSetter{}, config)
// TimestampConverterKey builds a processor which converts a timestamp in a field in the key
// into a different type. The supported types are:
// - "string"
// - "unix"
// - "time.Time".
//
// Any combination of the supported types is possible. For example, it's possible
// to convert from a Unix timestamp to Go's time.Time or to convert from a string
// to a Unix timestamp.
//
// The processor supports only structured data.
func TimestampConverterKey(config processor.Config) (processor.Interface, error) {
return timestampConverter(timestampConverterKeyProcType, recordKeyGetSetter{}, config)
}

// TimestampConvertorPayload todo
func TimestampConvertorPayload(config processor.Config) (processor.Interface, error) {
return timestampConvertor(timestampConvertorPayloadProcType, recordPayloadGetSetter{}, config)
// TimestampConverterPayload builds the same processor as TimestampConverterKey, except that
// it operates on the field Record.Payload.After.
func TimestampConverterPayload(config processor.Config) (processor.Interface, error) {
return timestampConverter(timestampConverterPayloadProcType, recordPayloadGetSetter{}, config)
}

func timestampConvertor(
func timestampConverter(
processorType string,
getSetter recordDataGetSetter,
config processor.Config,
Expand All @@ -66,16 +77,16 @@ func timestampConvertor(
)

// if field is empty then input is raw data
if field, err = getConfigFieldString(config, timestampConvertorConfigField); err != nil {
if field, err = getConfigFieldString(config, timestampConverterConfigField); err != nil {
return nil, cerrors.Errorf("%s: %w", processorType, err)
}
if targetType, err = getConfigFieldString(config, timestampConvertorConfigTargetType); err != nil {
if targetType, err = getConfigFieldString(config, timestampConverterConfigTargetType); err != nil {
return nil, cerrors.Errorf("%s: %w", processorType, err)
}
if targetType != stringType && targetType != unixType && targetType != timeType {
return nil, cerrors.Errorf("%s: targetType (%s) is not supported", processorType, targetType)
}
format = config.Settings[timestampConvertorConfigFormat] // can be empty
format = config.Settings[timestampConverterConfigFormat] // can be empty
if format == "" && targetType == stringType {
return nil, cerrors.Errorf("%s: format is needed to parse the output", processorType)
}
Expand Down
Loading

0 comments on commit 9541533

Please sign in to comment.