Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add performance test for syslog receiver #2942

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver"
Expand Down Expand Up @@ -119,6 +120,7 @@ func components() (component.Factories, error) {
wavefrontreceiver.NewFactory(),
windowsperfcountersreceiver.NewFactory(),
zookeeperreceiver.NewFactory(),
syslogreceiver.NewFactory(),
}

receivers = append(receivers, extraReceivers()...)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver v0.0.0-00010101000000-000000000000
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,7 @@ github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/observiq/ctimefmt v1.0.0 h1:r7vTJ+Slkrt9fZ67mkf+mA6zAdR5nGIJRMTzkUyvilk=
github.com/observiq/ctimefmt v1.0.0/go.mod h1:mxi62//WbSpG/roCO1c6MqZ7zQTvjVtYheqHN3eOjvc=
github.com/observiq/go-syslog/v3 v3.0.2 h1:vaeINFErM/E3cKE2Ot1FAhhGq5mv7uGBOzjnGL3qhbY=
github.com/observiq/go-syslog/v3 v3.0.2/go.mod h1:9abcumkQwDUY0VgWdH6CaaJ3Ks39A7NvIelMlavPru0=
github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc h1:49ewVBwLcy+eYqI4R0ICilCI4dPjddpFXWv3liXzUxM=
github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc/go.mod h1:WXIHwGy+c7/IK2PiJ4oxuTHkpnkSut7TNFpKnI5llPU=
Expand Down
132 changes: 132 additions & 0 deletions testbed/datasenders/syslog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datasenders

import (
"bytes"
"context"
"fmt"
"net"
"strings"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/testbed/testbed"
)

type SyslogWriter struct {
testbed.DataSenderBase
conn net.Conn
buf []string
bufSize int
network string
}

var _ testbed.LogDataSender = (*SyslogWriter)(nil)

func NewSyslogWriter(network string, host string, port int, batchSize int) *SyslogWriter {
f := &SyslogWriter{
network: network,
bufSize: batchSize,
DataSenderBase: testbed.DataSenderBase{
Port: port,
Host: host,
},
}
return f
}

func (f *SyslogWriter) GetEndpoint() net.Addr {
var addr net.Addr
switch f.network {
case "udp":
addr, _ = net.ResolveUDPAddr(f.network, fmt.Sprintf("%s:%d", f.Host, f.Port))

default:
addr, _ = net.ResolveTCPAddr(f.network, fmt.Sprintf("%s:%d", f.Host, f.Port))
}
return addr
}

func (f *SyslogWriter) Start() (err error) {
f.conn, err = net.Dial(f.GetEndpoint().Network(), f.GetEndpoint().String())
// udp not ack, can't use net.Dial to check udp server is ready, use sleep 1 second to wait udp server start
if f.network == "udp" {
time.Sleep(1 * time.Second)
}
return err
}

func (f *SyslogWriter) ConsumeLogs(_ context.Context, logs pdata.Logs) error {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
for j := 0; j < logs.ResourceLogs().At(i).InstrumentationLibraryLogs().Len(); j++ {
ills := logs.ResourceLogs().At(i).InstrumentationLibraryLogs().At(j)
for k := 0; k < ills.Logs().Len(); k++ {
err := f.Send(ills.Logs().At(k))
if err != nil {
return err
}
}
}
}
return nil
}

func (f *SyslogWriter) GenConfigYAMLStr() string {
return fmt.Sprintf(`
syslog:
protocol: rfc5424
%s:
listen_address: "%s"
`, f.network, f.GetEndpoint())
}
func (f *SyslogWriter) Send(lr pdata.LogRecord) error {
ts := time.Unix(int64(lr.Timestamp()/1000000000), int64(lr.Timestamp()%100000000)).Format(time.RFC3339Nano)
sdid := strings.Builder{}
sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "trace_id", lr.TraceID().HexString()))
sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "span_id", lr.SpanID().HexString()))
sdid.WriteString(fmt.Sprintf("%s=\"%d\" ", "trace_flags", lr.Flags()))
lr.Attributes().ForEach(func(k string, v pdata.AttributeValue) {
sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", k, v.StringVal()))
})
msg := fmt.Sprintf("<166> %s localhost %s - - [%s] %s\n", ts, lr.Name(), sdid.String(), lr.Body().StringVal())

f.buf = append(f.buf, msg)
return f.SendCheck()
}

func (f *SyslogWriter) SendCheck() error {
if len(f.buf) == f.bufSize {
b := bytes.NewBufferString("")
for _, v := range f.buf {
b.WriteString(v)
}

_, err := f.conn.Write(b.Bytes())
f.buf = []string{}
if err != nil {
return nil
}

}
return nil
}

func (f *SyslogWriter) Flush() {
}

func (f *SyslogWriter) ProtocolName() string {
return "syslog"
}
27 changes: 27 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,33 @@ func TestLog10kDPS(t *testing.T) {
ExpectedMaxRAM: 150,
},
},
{
name: "syslog-udp",
sender: datasenders.NewSyslogWriter("udp", testbed.DefaultHost, testbed.GetAvailablePort(t), 1),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 150,
},
},
{
name: "syslog-tcp-batch-1",
sender: datasenders.NewSyslogWriter("tcp", testbed.DefaultHost, testbed.GetAvailablePort(t), 1),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 150,
},
},
{
name: "syslog-tcp-batch-100",
sender: datasenders.NewSyslogWriter("tcp", testbed.DefaultHost, testbed.GetAvailablePort(t), 100),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 150,
},
},
/*
{
name: "FluentBitToOTLP",
Expand Down