-
Notifications
You must be signed in to change notification settings - Fork 22
/
Copy pathwriter.go
192 lines (163 loc) · 4.97 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
package helper
import (
"context"
"encoding/json"
"fmt"
"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
)
// NewWriterConfig creates a new writer config
func NewWriterConfig(operatorID, operatorType string) WriterConfig {
return WriterConfig{
BasicConfig: NewBasicConfig(operatorID, operatorType),
}
}
// WriterConfig is the configuration of a writer operator.
type WriterConfig struct {
BasicConfig `yaml:",inline"`
OutputIDs OutputIDs `json:"output" yaml:"output"`
}
// Build will build a writer operator from the config.
func (c WriterConfig) Build(context operator.BuildContext) (WriterOperator, error) {
basicOperator, err := c.BasicConfig.Build(context)
if err != nil {
return WriterOperator{}, err
}
writer := WriterOperator{
OutputIDs: c.OutputIDs,
BasicOperator: basicOperator,
}
return writer, nil
}
// SetNamespace will namespace the output ids of the writer.
func (c *WriterConfig) SetNamespace(namespace string, exclusions ...string) {
c.BasicConfig.SetNamespace(namespace, exclusions...)
for i, outputID := range c.OutputIDs {
if CanNamespace(outputID, exclusions) {
c.OutputIDs[i] = AddNamespace(outputID, namespace)
}
}
}
// WriterOperator is an operator that can write to other operators.
type WriterOperator struct {
BasicOperator
OutputIDs OutputIDs
OutputOperators []operator.Operator
}
// Write will write an entry to the outputs of the operator.
func (w *WriterOperator) Write(ctx context.Context, e *entry.Entry) {
for i, operator := range w.OutputOperators {
if i == len(w.OutputOperators)-1 {
_ = operator.Process(ctx, e)
return
}
operator.Process(ctx, e.Copy())
}
}
// CanOutput always returns true for a writer operator.
func (w *WriterOperator) CanOutput() bool {
return true
}
// Outputs returns the outputs of the writer operator.
func (w *WriterOperator) Outputs() []operator.Operator {
return w.OutputOperators
}
// SetOutputs will set the outputs of the operator.
func (w *WriterOperator) SetOutputs(operators []operator.Operator) error {
outputOperators := make([]operator.Operator, 0)
for _, operatorID := range w.OutputIDs {
operator, ok := w.findOperator(operators, operatorID)
if !ok {
return fmt.Errorf("operator '%s' does not exist", operatorID)
}
if !operator.CanProcess() {
return fmt.Errorf("operator '%s' can not process entries", operatorID)
}
outputOperators = append(outputOperators, operator)
}
// No outputs have been set, so use the next configured operator
if len(w.OutputIDs) == 0 {
currentOperatorIndex := -1
for i, operator := range operators {
if operator.ID() == w.ID() {
currentOperatorIndex = i
break
}
}
if currentOperatorIndex == -1 {
return fmt.Errorf("unexpectedly could not find self in array of operators")
}
nextOperatorIndex := currentOperatorIndex + 1
if nextOperatorIndex == len(operators) {
return fmt.Errorf("cannot omit output for the last operator in the pipeline")
}
nextOperator := operators[nextOperatorIndex]
if !nextOperator.CanProcess() {
return fmt.Errorf("operator '%s' cannot process entries, but it was selected as a receiver because 'output' was omitted", nextOperator.ID())
}
outputOperators = append(outputOperators, nextOperator)
}
w.OutputOperators = outputOperators
return nil
}
// FindOperator will find an operator matching the supplied id.
func (w *WriterOperator) findOperator(operators []operator.Operator, operatorID string) (operator.Operator, bool) {
for _, operator := range operators {
if operator.ID() == operatorID {
return operator, true
}
}
return nil, false
}
// OutputIDs is a collection of operator IDs used as outputs.
type OutputIDs []string
// UnmarshalJSON will unmarshal a string or array of strings to OutputIDs.
func (o *OutputIDs) UnmarshalJSON(bytes []byte) error {
var value interface{}
err := json.Unmarshal(bytes, &value)
if err != nil {
return err
}
ids, err := o.fromInterface(value)
if err != nil {
return err
}
*o = ids
return nil
}
// UnmarshalYAML will unmarshal a string or array of strings to OutputIDs.
func (o *OutputIDs) UnmarshalYAML(unmarshal func(interface{}) error) error {
var value interface{}
err := unmarshal(&value)
if err != nil {
return err
}
ids, err := o.fromInterface(value)
if err != nil {
return err
}
*o = ids
return nil
}
// fromInterface will parse OutputIDs from a raw interface.
func (o *OutputIDs) fromInterface(value interface{}) (OutputIDs, error) {
if str, ok := value.(string); ok {
return OutputIDs{str}, nil
}
if array, ok := value.([]interface{}); ok {
return o.fromArray(array)
}
return nil, fmt.Errorf("value is not of type string or string array")
}
// fromArray will parse OutputIDs from a raw array.
func (o *OutputIDs) fromArray(array []interface{}) (OutputIDs, error) {
ids := OutputIDs{}
for _, rawValue := range array {
strValue, ok := rawValue.(string)
if !ok {
return nil, fmt.Errorf("value in array is not of type string")
}
ids = append(ids, strValue)
}
return ids, nil
}