From 8f3f191c06d63e6838765b45bcabe425fede8812 Mon Sep 17 00:00:00 2001 From: JustWPH Date: Wed, 31 Mar 2021 22:47:16 +0800 Subject: [PATCH] Add performance test for syslog receiver (#2942) Resolved #2528 --- cmd/otelcontribcol/components.go | 2 + go.mod | 1 + go.sum | 1 + testbed/datasenders/syslog.go | 132 +++++++++++++++++++++++++++++++ testbed/tests/log_test.go | 27 +++++++ 5 files changed, 163 insertions(+) create mode 100644 testbed/datasenders/syslog.go diff --git a/cmd/otelcontribcol/components.go b/cmd/otelcontribcol/components.go index 0ca518b87817..a205c4670f1b 100644 --- a/cmd/otelcontribcol/components.go +++ b/cmd/otelcontribcol/components.go @@ -71,6 +71,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver" @@ -119,6 +120,7 @@ func components() (component.Factories, error) { wavefrontreceiver.NewFactory(), windowsperfcountersreceiver.NewFactory(), zookeeperreceiver.NewFactory(), + syslogreceiver.NewFactory(), } receivers = append(receivers, extraReceivers()...) diff --git a/go.mod b/go.mod index f288a3670b0f..7467354cfac2 100644 --- a/go.mod +++ b/go.mod @@ -57,6 +57,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver v0.0.0-00010101000000-000000000000 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver v0.0.0-00010101000000-000000000000 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver v0.0.0-00010101000000-000000000000 diff --git a/go.sum b/go.sum index 559dce40f89c..b5a8c2de2421 100644 --- a/go.sum +++ b/go.sum @@ -1065,6 +1065,7 @@ github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/observiq/ctimefmt v1.0.0 h1:r7vTJ+Slkrt9fZ67mkf+mA6zAdR5nGIJRMTzkUyvilk= github.com/observiq/ctimefmt v1.0.0/go.mod h1:mxi62//WbSpG/roCO1c6MqZ7zQTvjVtYheqHN3eOjvc= +github.com/observiq/go-syslog/v3 v3.0.2 h1:vaeINFErM/E3cKE2Ot1FAhhGq5mv7uGBOzjnGL3qhbY= github.com/observiq/go-syslog/v3 v3.0.2/go.mod h1:9abcumkQwDUY0VgWdH6CaaJ3Ks39A7NvIelMlavPru0= github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc h1:49ewVBwLcy+eYqI4R0ICilCI4dPjddpFXWv3liXzUxM= github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc/go.mod h1:WXIHwGy+c7/IK2PiJ4oxuTHkpnkSut7TNFpKnI5llPU= diff --git a/testbed/datasenders/syslog.go b/testbed/datasenders/syslog.go new file mode 100644 index 000000000000..e3921705ea0e --- /dev/null +++ b/testbed/datasenders/syslog.go @@ -0,0 +1,132 @@ +// Copyright OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datasenders + +import ( + "bytes" + "context" + "fmt" + "net" + "strings" + "time" + + "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/testbed/testbed" +) + +type SyslogWriter struct { + testbed.DataSenderBase + conn net.Conn + buf []string + bufSize int + network string +} + +var _ testbed.LogDataSender = (*SyslogWriter)(nil) + +func NewSyslogWriter(network string, host string, port int, batchSize int) *SyslogWriter { + f := &SyslogWriter{ + network: network, + bufSize: batchSize, + DataSenderBase: testbed.DataSenderBase{ + Port: port, + Host: host, + }, + } + return f +} + +func (f *SyslogWriter) GetEndpoint() net.Addr { + var addr net.Addr + switch f.network { + case "udp": + addr, _ = net.ResolveUDPAddr(f.network, fmt.Sprintf("%s:%d", f.Host, f.Port)) + + default: + addr, _ = net.ResolveTCPAddr(f.network, fmt.Sprintf("%s:%d", f.Host, f.Port)) + } + return addr +} + +func (f *SyslogWriter) Start() (err error) { + f.conn, err = net.Dial(f.GetEndpoint().Network(), f.GetEndpoint().String()) + // udp not ack, can't use net.Dial to check udp server is ready, use sleep 1 second to wait udp server start + if f.network == "udp" { + time.Sleep(1 * time.Second) + } + return err +} + +func (f *SyslogWriter) ConsumeLogs(_ context.Context, logs pdata.Logs) error { + for i := 0; i < logs.ResourceLogs().Len(); i++ { + for j := 0; j < logs.ResourceLogs().At(i).InstrumentationLibraryLogs().Len(); j++ { + ills := logs.ResourceLogs().At(i).InstrumentationLibraryLogs().At(j) + for k := 0; k < ills.Logs().Len(); k++ { + err := f.Send(ills.Logs().At(k)) + if err != nil { + return err + } + } + } + } + return nil +} + +func (f *SyslogWriter) GenConfigYAMLStr() string { + return fmt.Sprintf(` + syslog: + protocol: rfc5424 + %s: + listen_address: "%s" +`, f.network, f.GetEndpoint()) +} +func (f *SyslogWriter) Send(lr pdata.LogRecord) error { + ts := time.Unix(int64(lr.Timestamp()/1000000000), int64(lr.Timestamp()%100000000)).Format(time.RFC3339Nano) + sdid := strings.Builder{} + sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "trace_id", lr.TraceID().HexString())) + sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "span_id", lr.SpanID().HexString())) + sdid.WriteString(fmt.Sprintf("%s=\"%d\" ", "trace_flags", lr.Flags())) + lr.Attributes().ForEach(func(k string, v pdata.AttributeValue) { + sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", k, v.StringVal())) + }) + msg := fmt.Sprintf("<166> %s localhost %s - - [%s] %s\n", ts, lr.Name(), sdid.String(), lr.Body().StringVal()) + + f.buf = append(f.buf, msg) + return f.SendCheck() +} + +func (f *SyslogWriter) SendCheck() error { + if len(f.buf) == f.bufSize { + b := bytes.NewBufferString("") + for _, v := range f.buf { + b.WriteString(v) + } + + _, err := f.conn.Write(b.Bytes()) + f.buf = []string{} + if err != nil { + return nil + } + + } + return nil +} + +func (f *SyslogWriter) Flush() { +} + +func (f *SyslogWriter) ProtocolName() string { + return "syslog" +} diff --git a/testbed/tests/log_test.go b/testbed/tests/log_test.go index 6f85a6238aea..3b7608ff15ed 100644 --- a/testbed/tests/log_test.go +++ b/testbed/tests/log_test.go @@ -89,6 +89,33 @@ func TestLog10kDPS(t *testing.T) { ExpectedMaxRAM: 150, }, }, + { + name: "syslog-udp", + sender: datasenders.NewSyslogWriter("udp", testbed.DefaultHost, testbed.GetAvailablePort(t), 1), + receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 80, + ExpectedMaxRAM: 150, + }, + }, + { + name: "syslog-tcp-batch-1", + sender: datasenders.NewSyslogWriter("tcp", testbed.DefaultHost, testbed.GetAvailablePort(t), 1), + receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 80, + ExpectedMaxRAM: 150, + }, + }, + { + name: "syslog-tcp-batch-100", + sender: datasenders.NewSyslogWriter("tcp", testbed.DefaultHost, testbed.GetAvailablePort(t), 100), + receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)), + resourceSpec: testbed.ResourceSpec{ + ExpectedMaxCPU: 80, + ExpectedMaxRAM: 150, + }, + }, /* { name: "FluentBitToOTLP",