Skip to content

Commit

Permalink
Add performance test for syslog receiver (open-telemetry#2942)
Browse files Browse the repository at this point in the history
Resolved #2528
  • Loading branch information
wph95 authored and pmatyjasek-sumo committed Apr 28, 2021
1 parent 8a6ec94 commit d204b84
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver"
Expand Down Expand Up @@ -119,6 +120,7 @@ func components() (component.Factories, error) {
wavefrontreceiver.NewFactory(),
windowsperfcountersreceiver.NewFactory(),
zookeeperreceiver.NewFactory(),
syslogreceiver.NewFactory(),
}

receivers = append(receivers, extraReceivers()...)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/simpleprometheusreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/statsdreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/wavefrontreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/windowsperfcountersreceiver v0.0.0-00010101000000-000000000000
github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zookeeperreceiver v0.0.0-00010101000000-000000000000
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,7 @@ github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/observiq/ctimefmt v1.0.0 h1:r7vTJ+Slkrt9fZ67mkf+mA6zAdR5nGIJRMTzkUyvilk=
github.com/observiq/ctimefmt v1.0.0/go.mod h1:mxi62//WbSpG/roCO1c6MqZ7zQTvjVtYheqHN3eOjvc=
github.com/observiq/go-syslog/v3 v3.0.2 h1:vaeINFErM/E3cKE2Ot1FAhhGq5mv7uGBOzjnGL3qhbY=
github.com/observiq/go-syslog/v3 v3.0.2/go.mod h1:9abcumkQwDUY0VgWdH6CaaJ3Ks39A7NvIelMlavPru0=
github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc h1:49ewVBwLcy+eYqI4R0ICilCI4dPjddpFXWv3liXzUxM=
github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc/go.mod h1:WXIHwGy+c7/IK2PiJ4oxuTHkpnkSut7TNFpKnI5llPU=
Expand Down
132 changes: 132 additions & 0 deletions testbed/datasenders/syslog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datasenders

import (
"bytes"
"context"
"fmt"
"net"
"strings"
"time"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/testbed/testbed"
)

type SyslogWriter struct {
testbed.DataSenderBase
conn net.Conn
buf []string
bufSize int
network string
}

var _ testbed.LogDataSender = (*SyslogWriter)(nil)

func NewSyslogWriter(network string, host string, port int, batchSize int) *SyslogWriter {
f := &SyslogWriter{
network: network,
bufSize: batchSize,
DataSenderBase: testbed.DataSenderBase{
Port: port,
Host: host,
},
}
return f
}

func (f *SyslogWriter) GetEndpoint() net.Addr {
var addr net.Addr
switch f.network {
case "udp":
addr, _ = net.ResolveUDPAddr(f.network, fmt.Sprintf("%s:%d", f.Host, f.Port))

default:
addr, _ = net.ResolveTCPAddr(f.network, fmt.Sprintf("%s:%d", f.Host, f.Port))
}
return addr
}

func (f *SyslogWriter) Start() (err error) {
f.conn, err = net.Dial(f.GetEndpoint().Network(), f.GetEndpoint().String())
// udp not ack, can't use net.Dial to check udp server is ready, use sleep 1 second to wait udp server start
if f.network == "udp" {
time.Sleep(1 * time.Second)
}
return err
}

func (f *SyslogWriter) ConsumeLogs(_ context.Context, logs pdata.Logs) error {
for i := 0; i < logs.ResourceLogs().Len(); i++ {
for j := 0; j < logs.ResourceLogs().At(i).InstrumentationLibraryLogs().Len(); j++ {
ills := logs.ResourceLogs().At(i).InstrumentationLibraryLogs().At(j)
for k := 0; k < ills.Logs().Len(); k++ {
err := f.Send(ills.Logs().At(k))
if err != nil {
return err
}
}
}
}
return nil
}

func (f *SyslogWriter) GenConfigYAMLStr() string {
return fmt.Sprintf(`
syslog:
protocol: rfc5424
%s:
listen_address: "%s"
`, f.network, f.GetEndpoint())
}
func (f *SyslogWriter) Send(lr pdata.LogRecord) error {
ts := time.Unix(int64(lr.Timestamp()/1000000000), int64(lr.Timestamp()%100000000)).Format(time.RFC3339Nano)
sdid := strings.Builder{}
sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "trace_id", lr.TraceID().HexString()))
sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", "span_id", lr.SpanID().HexString()))
sdid.WriteString(fmt.Sprintf("%s=\"%d\" ", "trace_flags", lr.Flags()))
lr.Attributes().ForEach(func(k string, v pdata.AttributeValue) {
sdid.WriteString(fmt.Sprintf("%s=\"%s\" ", k, v.StringVal()))
})
msg := fmt.Sprintf("<166> %s localhost %s - - [%s] %s\n", ts, lr.Name(), sdid.String(), lr.Body().StringVal())

f.buf = append(f.buf, msg)
return f.SendCheck()
}

func (f *SyslogWriter) SendCheck() error {
if len(f.buf) == f.bufSize {
b := bytes.NewBufferString("")
for _, v := range f.buf {
b.WriteString(v)
}

_, err := f.conn.Write(b.Bytes())
f.buf = []string{}
if err != nil {
return nil
}

}
return nil
}

func (f *SyslogWriter) Flush() {
}

func (f *SyslogWriter) ProtocolName() string {
return "syslog"
}
27 changes: 27 additions & 0 deletions testbed/tests/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,33 @@ func TestLog10kDPS(t *testing.T) {
ExpectedMaxRAM: 150,
},
},
{
name: "syslog-udp",
sender: datasenders.NewSyslogWriter("udp", testbed.DefaultHost, testbed.GetAvailablePort(t), 1),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 150,
},
},
{
name: "syslog-tcp-batch-1",
sender: datasenders.NewSyslogWriter("tcp", testbed.DefaultHost, testbed.GetAvailablePort(t), 1),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 150,
},
},
{
name: "syslog-tcp-batch-100",
sender: datasenders.NewSyslogWriter("tcp", testbed.DefaultHost, testbed.GetAvailablePort(t), 100),
receiver: testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 80,
ExpectedMaxRAM: 150,
},
},
/*
{
name: "FluentBitToOTLP",
Expand Down

0 comments on commit d204b84

Please sign in to comment.