diff --git a/docs/operators/forward_input.md b/docs/operators/forward_input.md deleted file mode 100644 index cf7f9db0..00000000 --- a/docs/operators/forward_input.md +++ /dev/null @@ -1,40 +0,0 @@ -## `foward_input` operator - -The `foward_input` operator receives logs from another Stanza instance running `forward_output`. - -### Configuration Fields - -| Field | Default | Description | -| --- | --- | --- | -| `id` | `forward_output` | A unique identifier for the operator | -| `listen_address` | `:80` | The IP address and port to listen on | -| `tls` | | A block for configuring the server to listen with TLS | - -#### TLS block configuration - -| Field | Default | Description | -| --- | --- | --- | -| `cert_file` | | The location of the certificate file | -| `key_file` | | The location of the key file | - - -### Example Configurations - -#### Simple configuration - -Configuration: -```yaml -- type: forward_input - listen_address: ":25535" -``` - -#### TLS configuration - -Configuration: -```yaml -- type: forward_input - listen_address: ":25535" - tls: - cert_file: /tmp/public.crt - key_file: /tmp/private.key -``` diff --git a/docs/operators/forward_output.md b/docs/operators/forward_output.md deleted file mode 100644 index bcdf0882..00000000 --- a/docs/operators/forward_output.md +++ /dev/null @@ -1,23 +0,0 @@ -## `forward_output` operator - -The `forward_output` operator sends logs to another Stanza instance running `forward_input`. - -### Configuration Fields - -| Field | Default | Description | -| --- | --- | --- | -| `id` | `forward_output` | A unique identifier for the operator | -| `address` | required | The address that the downstream Stanza instance is listening on | -| `buffer` | | A [buffer](/docs/types/buffer.md) block indicating how to buffer entries before flushing | -| `flusher` | | A [flusher](/docs/types/flusher.md) block configuring flushing behavior | - - -### Example Configurations - -#### Simple configuration - -Configuration: -```yaml -- type: forward_output - address: "http://downstream_server:25535" -``` diff --git a/operator/builtin/input/forward/forward.go b/operator/builtin/input/forward/forward.go deleted file mode 100644 index 43a17409..00000000 --- a/operator/builtin/input/forward/forward.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package forward - -import ( - "context" - "encoding/json" - "net" - "net/http" - - "github.com/open-telemetry/opentelemetry-log-collection/entry" - "github.com/open-telemetry/opentelemetry-log-collection/errors" - "github.com/open-telemetry/opentelemetry-log-collection/operator" - "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" - "go.uber.org/zap" -) - -func init() { - operator.Register("forward_input", func() operator.Builder { return NewForwardInputConfig("") }) -} - -// NewForwardInputConfig creates a new stdin input config with default values -func NewForwardInputConfig(operatorID string) *ForwardInputConfig { - return &ForwardInputConfig{ - InputConfig: helper.NewInputConfig(operatorID, "stdin"), - } -} - -// ForwardInputConfig is the configuration of a forward input operator -type ForwardInputConfig struct { - helper.InputConfig `yaml:",inline"` - ListenAddress string `json:"listen_address" yaml:"listen_address"` - TLS *TLSConfig `json:"tls" yaml:"tls"` -} - -// TLSConfig is a configuration struct for forward input TLS -type TLSConfig struct { - CertFile string `json:"cert_file" yaml:"cert_file"` - KeyFile string `json:"key_file" yaml:"key_file"` -} - -// Build will build a forward input operator. -func (c *ForwardInputConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { - inputOperator, err := c.InputConfig.Build(context) - if err != nil { - return nil, err - } - - forwardInput := &ForwardInput{ - InputOperator: inputOperator, - tls: c.TLS, - } - - forwardInput.srv = &http.Server{ - Addr: c.ListenAddress, - Handler: forwardInput, - } - - return []operator.Operator{forwardInput}, nil -} - -// ForwardInput is an operator that reads input from stdin -type ForwardInput struct { - helper.InputOperator - - srv *http.Server - ln net.Listener - tls *TLSConfig -} - -// Start will start generating log entries. -func (f *ForwardInput) Start() error { - addr := f.srv.Addr - if addr == "" { - addr = ":http" - } - - ln, err := net.Listen("tcp", addr) - if err != nil { - return errors.Wrap(err, "start listener") - } - - // Save the listener so we can use a dynamic port for tests - f.ln = ln - - go func() { - if f.tls != nil { - err = f.srv.ServeTLS(ln, f.tls.CertFile, f.tls.KeyFile) - } else { - err = f.srv.Serve(ln) - } - if err != nil && err != http.ErrServerClosed { - f.Errorw("Serve error", zap.Error(err)) - } - }() - - return nil -} - -// Stop will stop generating logs. -func (f *ForwardInput) Stop() error { - return f.srv.Shutdown(context.Background()) -} - -func (f *ForwardInput) ServeHTTP(wr http.ResponseWriter, req *http.Request) { - dec := json.NewDecoder(req.Body) - - var entries []*entry.Entry - if err := dec.Decode(&entries); err != nil { - wr.WriteHeader(http.StatusBadRequest) - return - } - - for _, entry := range entries { - f.Write(req.Context(), entry) - } -} diff --git a/operator/builtin/input/forward/forward_test.go b/operator/builtin/input/forward/forward_test.go deleted file mode 100644 index ddf21a56..00000000 --- a/operator/builtin/input/forward/forward_test.go +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package forward - -import ( - "bytes" - "crypto/tls" - "crypto/x509" - "encoding/json" - "fmt" - "net" - "net/http" - "os" - "path/filepath" - "testing" - "time" - - "github.com/open-telemetry/opentelemetry-log-collection/entry" - "github.com/open-telemetry/opentelemetry-log-collection/operator" - "github.com/open-telemetry/opentelemetry-log-collection/testutil" - "github.com/stretchr/testify/require" -) - -func TestForwardInput(t *testing.T) { - cfg := NewForwardInputConfig("test") - cfg.ListenAddress = "0.0.0.0:0" - cfg.OutputIDs = []string{"fake"} - - ops, err := cfg.Build(testutil.NewBuildContext(t)) - require.NoError(t, err) - forwardInput := ops[0].(*ForwardInput) - - fake := testutil.NewFakeOutput(t) - err = forwardInput.SetOutputs([]operator.Operator{fake}) - require.NoError(t, err) - - require.NoError(t, forwardInput.Start()) - defer forwardInput.Stop() - - newEntry := entry.New() - newEntry.Record = "test" - newEntry.Timestamp = newEntry.Timestamp.Round(time.Second) - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - require.NoError(t, enc.Encode([]*entry.Entry{newEntry})) - - _, port, err := net.SplitHostPort(forwardInput.ln.Addr().String()) - require.NoError(t, err) - - _, err = http.Post(fmt.Sprintf("http://127.0.0.1:%s", port), "application/json", &buf) - require.NoError(t, err) - - select { - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for entry to be received") - case e := <-fake.Received: - require.True(t, newEntry.Timestamp.Equal(e.Timestamp)) - require.Equal(t, newEntry.Record, e.Record) - require.Equal(t, newEntry.Severity, e.Severity) - require.Equal(t, newEntry.SeverityText, e.SeverityText) - require.Equal(t, newEntry.Labels, e.Labels) - require.Equal(t, newEntry.Resource, e.Resource) - } -} - -func TestForwardInputTLS(t *testing.T) { - certFile, keyFile := createCertFiles(t) - - cfg := NewForwardInputConfig("test") - cfg.ListenAddress = "0.0.0.0:0" - cfg.TLS = &TLSConfig{ - CertFile: certFile, - KeyFile: keyFile, - } - cfg.OutputIDs = []string{"fake"} - - ops, err := cfg.Build(testutil.NewBuildContext(t)) - require.NoError(t, err) - forwardInput := ops[0].(*ForwardInput) - - fake := testutil.NewFakeOutput(t) - err = forwardInput.SetOutputs([]operator.Operator{fake}) - require.NoError(t, err) - - require.NoError(t, forwardInput.Start()) - defer forwardInput.Stop() - - newEntry := entry.New() - newEntry.Record = "test" - newEntry.Timestamp = newEntry.Timestamp.Round(time.Second) - var buf bytes.Buffer - enc := json.NewEncoder(&buf) - require.NoError(t, enc.Encode([]*entry.Entry{newEntry})) - - _, port, err := net.SplitHostPort(forwardInput.ln.Addr().String()) - require.NoError(t, err) - - pool := x509.NewCertPool() - pool.AppendCertsFromPEM(publicCrt) - - client := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - RootCAs: pool, - }, - }, - } - - _, err = client.Post(fmt.Sprintf("https://127.0.0.1:%s", port), "application/json", &buf) - require.NoError(t, err) - - select { - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for entry to be received") - case e := <-fake.Received: - require.True(t, newEntry.Timestamp.Equal(e.Timestamp)) - require.Equal(t, newEntry.Record, e.Record) - require.Equal(t, newEntry.Severity, e.Severity) - require.Equal(t, newEntry.SeverityText, e.SeverityText) - require.Equal(t, newEntry.Labels, e.Labels) - require.Equal(t, newEntry.Resource, e.Resource) - } -} - -func createCertFiles(t *testing.T) (cert, key string) { - tempDir := testutil.NewTempDir(t) - - certFile, err := os.Create(filepath.Join(tempDir, "cert")) - require.NoError(t, err) - _, err = certFile.Write(publicCrt) - require.NoError(t, err) - certFile.Close() - - keyFile, err := os.Create(filepath.Join(tempDir, "key")) - require.NoError(t, err) - _, err = keyFile.Write(privateKey) - require.NoError(t, err) - keyFile.Close() - - return certFile.Name(), keyFile.Name() -} - -/* - openssl req -x509 -nodes -newkey rsa:2048 -keyout key.pem -out cert.pem -config san.cnf - Generated with the following san.cnf: - - [req] - default_bits = 2048 - distinguished_name = req_distinguished_name - req_extensions = req_ext - x509_extensions = v3_req - prompt = no - - [req_distinguished_name] - countryName = XX - stateOrProvinceName = N/A - localityName = N/A - organizationName = Self-signed certificate - commonName = 120.0.0.1: Self-signed certificate - - [req_ext] - subjectAltName = @alt_names - - [v3_req] - subjectAltName = @alt_names - - [alt_names] - IP.1 = 127.0.0.1 -*/ -var privateKey = []byte(` ------BEGIN PRIVATE KEY----- -MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQDVYJKyB9jDamy5 -gAdRldyqdFfEoQmWqVFVsNcYVqkbLSagcwoi21p8YU4TMkC/vrkN/jHg9lOiwi5z -Uegt/52HG6cpaaAW3AVwA+4/Cwhi4a/dwUGBO80HMvMQ9RwH2ISLa1X0rWPxVT+l -VNQXxfrEU8Jwv80cDwpJUcywmGWuiiPAYMAZLnQjp3jAS/rbVBV4kWAOlgYstKg9 -XjXpssF+LaRGU9F3WMlPtaXuQpMAy54r5nRYVwYGkXmePqC3BQGqgIBkXiaCgSZ5 -VLFM5zKGCJVyNwOytsH9exTo/UbbQnClREaHU6q/lIa7yBiTLvbid3ck89xHnX3a -/K8/ODCXAgMBAAECggEBALSppOsp66VheZcCSLASRBjqktmAQ/8VczErnqMT1PCW -pQra/G0Q7qc7OADW3q26zTKE1DSWO7Al23B2nDA+KmGXz0woC4zvU4dJPLKSI9Kd -JeuLUmwadvkucVEdR1N5RphJFCkrmeBe/pl8nmtWjIEoLgyKyR6FuX7kzHuFPSqu -bdkeA91nJ9o1uO7xhAKRBV2tnHL87TvZpQ3fJkJYg4f6cvgaHR8hggGxWqXcBuqh -XMialoXWlYXMXWBrR8sI/NWVTJjX8GMnuOK+qIRb4eSXrMvicLyE/elbd4eOUx2N -WSdA0OeguYNyqgrnb2nsDdFRAmmXdP0mQNrH2CPyNEECgYEA6pi07tpvl+TyYwkj -BA9pvmYbO4LyNyne0ALYB9bA5H7uhYIZ6QhdDRq/3hADrb+3Zt7HDSljjp9D/jRL -tWysa/1N0DX0VDQMiq5q7qcMrNvq8yCuhwt1/y15fXrPMaKOqiO/aV+NhTISqXF4 -BtXSD957MV/MbBuaMHJdlp6NzGECgYEA6NhDhihz8Kmo/MyiSBT3QPJwaYPSpOZb -SWe/5jKUrgigPpCkWHjOp+D1vgRskzkBBsWPyuhNuUrKpXGv93RvrjtSWsjFMPwe -DKRXbpD2MpA/7LMmUUNnVln8X8gsEXwv0OlJXftAJWnfH+XT9GjsiVrKLO2Z+I3g -52YZaQ+QX/cCgYA/ZRDP0vuBAn91v8xUlo2uxAzr4hDuU0RA1ePnCmOJ27s9HNE/ -peDvX6ElsxIra7l19RG5PswGiIdpNFyZJErby9GxSENEVeRlvYhsAXxtYeh11wkS -uUgjsvg3rm47LYB7/bkGEqo9qjBc1arnvfRMEYUc7JRjSno6SU19HE+ZQQKBgGKO -h6ZVoR2Q8rJue5I/LZkUBXjkD1k1GBauD5AEgOJZTFqvJqE8IVz1346amMqIKmMP -ZJniUmPHwJbe1DjN1CfPfEBpEu51CNMZDNkECvHEFQq/mcxz1125oRV5yQ1tn1+y -HxfkrXYopgT+ZwThFJ3fDAyQVcfbZgMMOF079URpAoGAMLiz6MKWN2f53kXVL4IP -fnn/6s1SwpyCQVAcYwHez50bMjltPov5r3TGCuazWciOEG46mx38JcTwCktJdZr8 -fKYH4NM0PNLDSiOHjvLkujlDJrBs4NwLfABPDbW/2387mqtDYbNO+XfVBF85fuZI -+Xfm7rWrp93+rNxKX2+0A+U= ------END PRIVATE KEY----- -`) - -var publicCrt = []byte(` ------BEGIN CERTIFICATE----- -MIIDhjCCAm6gAwIBAgIJAJOeoSf2lHXyMA0GCSqGSIb3DQEBCwUAMHgxCzAJBgNV -BAYTAlhYMQwwCgYDVQQIDANOL0ExDDAKBgNVBAcMA04vQTEgMB4GA1UECgwXU2Vs -Zi1zaWduZWQgY2VydGlmaWNhdGUxKzApBgNVBAMMIjEyMC4wLjAuMTogU2VsZi1z -aWduZWQgY2VydGlmaWNhdGUwHhcNMjEwMTI2MTYxMTM0WhcNMjEwMjI1MTYxMTM0 -WjB4MQswCQYDVQQGEwJYWDEMMAoGA1UECAwDTi9BMQwwCgYDVQQHDANOL0ExIDAe -BgNVBAoMF1NlbGYtc2lnbmVkIGNlcnRpZmljYXRlMSswKQYDVQQDDCIxMjAuMC4w -LjE6IFNlbGYtc2lnbmVkIGNlcnRpZmljYXRlMIIBIjANBgkqhkiG9w0BAQEFAAOC -AQ8AMIIBCgKCAQEA1WCSsgfYw2psuYAHUZXcqnRXxKEJlqlRVbDXGFapGy0moHMK -IttafGFOEzJAv765Df4x4PZTosIuc1HoLf+dhxunKWmgFtwFcAPuPwsIYuGv3cFB -gTvNBzLzEPUcB9iEi2tV9K1j8VU/pVTUF8X6xFPCcL/NHA8KSVHMsJhlroojwGDA -GS50I6d4wEv621QVeJFgDpYGLLSoPV416bLBfi2kRlPRd1jJT7Wl7kKTAMueK+Z0 -WFcGBpF5nj6gtwUBqoCAZF4mgoEmeVSxTOcyhgiVcjcDsrbB/XsU6P1G20JwpURG -h1Oqv5SGu8gYky724nd3JPPcR5192vyvPzgwlwIDAQABoxMwETAPBgNVHREECDAG -hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQBZ1kXwQGlRV83H3V02CTN/P1hItlCk -n9PHGXJDiaLpqxCY2DPnly7jFouPPk/HGODVAYerrBaPMiteI9Fc+JedxgIADRsg -06YuXhn3qUEVBe5a6UJTA52zTXiOTyUHZmWxKbn5lchp1YRvdkLis59i4KmI6cQJ -a5+dDjw8n9PauYMKne/aielDlysBeQAZVRMvPsuMH/XJ5prLD1lq4Y1MEFEdOAsw -sFilmgeJ/BCDyBBlD4qDeAGhomMnMFb8Cm95Nrv/NreaDXn6gFVe/w3npVBp0ksl -Lh172El7qlPcY9yluZvoK8OK/hdUFanb49T0F5vQcJXeutntT6goJJM4 ------END CERTIFICATE----- -`) diff --git a/operator/builtin/output/forward/forward.go b/operator/builtin/output/forward/forward.go deleted file mode 100644 index a723d53a..00000000 --- a/operator/builtin/output/forward/forward.go +++ /dev/null @@ -1,188 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package forward - -import ( - "bytes" - "context" - "encoding/json" - "io/ioutil" - "net/http" - "sync" - - "github.com/open-telemetry/opentelemetry-log-collection/entry" - "github.com/open-telemetry/opentelemetry-log-collection/errors" - "github.com/open-telemetry/opentelemetry-log-collection/operator" - "github.com/open-telemetry/opentelemetry-log-collection/operator/buffer" - "github.com/open-telemetry/opentelemetry-log-collection/operator/flusher" - "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" - "go.uber.org/zap" -) - -func init() { - operator.Register("forward_output", func() operator.Builder { return NewForwardOutputConfig("") }) -} - -// NewForwardOutputConfig creates a new forward output config with default values -func NewForwardOutputConfig(operatorID string) *ForwardOutputConfig { - return &ForwardOutputConfig{ - OutputConfig: helper.NewOutputConfig(operatorID, "forward_output"), - BufferConfig: buffer.NewConfig(), - FlusherConfig: flusher.NewConfig(), - } -} - -// ForwardOutputConfig is the configuration of a forward output operator. -type ForwardOutputConfig struct { - helper.OutputConfig `yaml:",inline"` - BufferConfig buffer.Config `json:"buffer" yaml:"buffer"` - FlusherConfig flusher.Config `json:"flusher" yaml:"flusher"` - Address string `json:"address" yaml:"address"` -} - -// Build will build an forward output operator. -func (c ForwardOutputConfig) Build(bc operator.BuildContext) ([]operator.Operator, error) { - outputOperator, err := c.OutputConfig.Build(bc) - if err != nil { - return nil, err - } - - buffer, err := c.BufferConfig.Build(bc, c.ID()) - if err != nil { - return nil, err - } - - if c.Address == "" { - return nil, errors.NewError("missing required parameter 'address'", "") - } - - flusher := c.FlusherConfig.Build(bc.Logger.SugaredLogger) - - ctx, cancel := context.WithCancel(context.Background()) - - forwardOutput := &ForwardOutput{ - OutputOperator: outputOperator, - buffer: buffer, - flusher: flusher, - ctx: ctx, - cancel: cancel, - client: &http.Client{}, - address: c.Address, - } - - return []operator.Operator{forwardOutput}, nil -} - -// ForwardOutput is an operator that sends entries to another stanza instance -type ForwardOutput struct { - helper.OutputOperator - buffer buffer.Buffer - flusher *flusher.Flusher - - client *http.Client - address string - - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup -} - -// Start signals to the ForwardOutput to begin flushing -func (f *ForwardOutput) Start() error { - f.wg.Add(1) - go func() { - defer f.wg.Done() - f.feedFlusher(f.ctx) - }() - - return nil -} - -// Stop tells the ForwardOutput to stop gracefully -func (f *ForwardOutput) Stop() error { - f.cancel() - f.wg.Wait() - f.flusher.Stop() - return f.buffer.Close() -} - -// Process adds an entry to the outputs buffer -func (f *ForwardOutput) Process(ctx context.Context, entry *entry.Entry) error { - return f.buffer.Add(ctx, entry) -} - -// ProcessMulti will send entries to elasticsearch. -func (f *ForwardOutput) createRequest(ctx context.Context, entries []*entry.Entry) (*http.Request, error) { - var b bytes.Buffer - enc := json.NewEncoder(&b) - err := enc.Encode(entries) - if err != nil { - return nil, err - } - - return http.NewRequestWithContext(ctx, "POST", f.address, &b) -} - -func (f *ForwardOutput) feedFlusher(ctx context.Context) { - for { - entries, clearer, err := f.buffer.ReadChunk(ctx) - if err != nil && err == context.Canceled { - return - } else if err != nil { - f.Errorf("Failed to read chunk", zap.Error(err)) - continue - } - - f.flusher.Do(func(ctx context.Context) error { - req, err := f.createRequest(ctx, entries) - if err != nil { - f.Errorf("Failed to create request", zap.Error(err)) - // drop these logs because we couldn't creat a request and a retry won't help - if err := clearer.MarkAllAsFlushed(); err != nil { - f.Errorf("Failed to mark entries as flushed after failing to create a request", zap.Error(err)) - } - return nil - } - - res, err := f.client.Do(req) - if err != nil { - return errors.Wrap(err, "send request") - } - - if err := f.handleResponse(res); err != nil { - return err - } - - if err = clearer.MarkAllAsFlushed(); err != nil { - f.Errorw("Failed to mark entries as flushed", zap.Error(err)) - } - return nil - }) - } -} - -func (f *ForwardOutput) handleResponse(res *http.Response) error { - if !(res.StatusCode >= 200 && res.StatusCode < 300) { - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return errors.NewError("unexpected status code", "", "status", res.Status) - } else { - res.Body.Close() - return errors.NewError("unexpected status code", "", "status", res.Status, "body", string(body)) - } - } - res.Body.Close() - return nil -} diff --git a/operator/builtin/output/forward/forward_test.go b/operator/builtin/output/forward/forward_test.go deleted file mode 100644 index 02c854fb..00000000 --- a/operator/builtin/output/forward/forward_test.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright The OpenTelemetry Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package forward - -import ( - "context" - "encoding/json" - "io/ioutil" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/open-telemetry/opentelemetry-log-collection/entry" - "github.com/open-telemetry/opentelemetry-log-collection/operator/buffer" - "github.com/open-telemetry/opentelemetry-log-collection/operator/helper" - "github.com/open-telemetry/opentelemetry-log-collection/testutil" - "github.com/stretchr/testify/require" -) - -func TestForwardOutput(t *testing.T) { - received := make(chan []byte, 1) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - body, _ := ioutil.ReadAll(req.Body) - received <- body - })) - - cfg := NewForwardOutputConfig("test") - memoryCfg := buffer.NewMemoryBufferConfig() - memoryCfg.MaxChunkDelay = helper.NewDuration(50 * time.Millisecond) - cfg.BufferConfig = buffer.Config{ - Builder: memoryCfg, - } - cfg.Address = srv.URL - - ops, err := cfg.Build(testutil.NewBuildContext(t)) - require.NoError(t, err) - forwardOutput := ops[0].(*ForwardOutput) - - newEntry := entry.New() - newEntry.Record = "test" - newEntry.Timestamp = newEntry.Timestamp.Round(time.Second) - require.NoError(t, forwardOutput.Start()) - defer forwardOutput.Stop() - require.NoError(t, forwardOutput.Process(context.Background(), newEntry)) - - select { - case <-time.After(time.Second): - require.FailNow(t, "Timed out waiting for server to receive entry") - case body := <-received: - var entries []*entry.Entry - require.NoError(t, json.Unmarshal(body, &entries)) - require.Len(t, entries, 1) - e := entries[0] - require.True(t, newEntry.Timestamp.Equal(e.Timestamp)) - require.Equal(t, newEntry.Record, e.Record) - require.Equal(t, newEntry.Severity, e.Severity) - require.Equal(t, newEntry.SeverityText, e.SeverityText) - require.Equal(t, newEntry.Labels, e.Labels) - require.Equal(t, newEntry.Resource, e.Resource) - } -}