Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

TCP Input: TLS support #29

Merged
merged 3 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Changed
- Added TLS support to `tcp_input` operator

## [0.15.0] - 2020-02-25

### Added
Expand Down
14 changes: 13 additions & 1 deletion docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,30 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
| `labels` | {} | A map of `key: value` labels to add to the entry's labels |
| `resource` | {} | A map of `key: value` labels to add to the entry's resource |

#### TLS Configuration

The `tcp_input` operator supports TLS, disabled by default.

| Field | Default | Description |
| --- | --- | --- |
| `enable` | `false` | Boolean value to enable or disable TLS |
| `certificate` | | File path for the X509 certificate chain |
| `private_key` | | File path for the X509 private key |


### Example Configurations

#### Simple

Configuration:
```yaml
- type: tcp_input
listen_adress: "0.0.0.0:54525"
listen_address: "0.0.0.0:54525"
```

Send a log:
Expand Down
78 changes: 68 additions & 10 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"fmt"
"net"
"sync"
"time"
"crypto/rand"
"crypto/tls"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
Expand All @@ -41,7 +44,20 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig {
type TCPInputConfig struct {
helper.InputConfig `yaml:",inline"`

ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"`
}

// TLSConfig is the configuration for a TLS listener
type TLSConfig struct {
// Enable forces the use of TLS
Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"`

// Certificate is the file path for the certificate
Certificate string `json:"certificate,omitempty" yaml:"certificate,omitempty"`

// PrivateKey is the file path for the private key
PrivateKey string `json:"private_key,omitempty" yaml:"private_key,omitempty"`
}

// Build will build a tcp input operator.
Expand All @@ -55,42 +71,84 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, fmt.Errorf("missing required parameter 'listen_address'")
}

address, err := net.ResolveTCPAddr("tcp", c.ListenAddress)
if err != nil {
// validate the input address
if _, err := net.ResolveTCPAddr("tcp", c.ListenAddress); err != nil {
return nil, fmt.Errorf("failed to resolve listen_address: %s", err)
}

cert := tls.Certificate{}
if c.TLS.Enable {
if c.TLS.Certificate == "" {
return nil, fmt.Errorf("missing required parameter 'certificate', required when TLS is enabled")
}

if c.TLS.PrivateKey == "" {
return nil, fmt.Errorf("missing required parameter 'private_key', required when TLS is enabled")
}

c, err := tls.LoadX509KeyPair(c.TLS.Certificate, c.TLS.PrivateKey)
if err != nil {
return nil, fmt.Errorf("failed to load tls certificate: %w", err)
}
cert = c
}

tcpInput := &TCPInput{
InputOperator: inputOperator,
address: address,
address: c.ListenAddress,
tlsEnable: c.TLS.Enable,
tlsKeyPair: cert,
}
return []operator.Operator{tcpInput}, nil
}

// TCPInput is an operator that listens for log entries over tcp.
type TCPInput struct {
helper.InputOperator
address *net.TCPAddr
address string
tlsEnable bool
tlsKeyPair tls.Certificate

listener *net.TCPListener
listener net.Listener
cancel context.CancelFunc
wg sync.WaitGroup
}

// Start will start listening for log entries over tcp.
func (t *TCPInput) Start() error {
listener, err := net.ListenTCP("tcp", t.address)
if err != nil {
if err := t.configureListener(); err != nil {
return fmt.Errorf("failed to listen on interface: %w", err)
}

t.listener = listener
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
t.goListen(ctx)
return nil
}

func (t *TCPInput) configureListener() error {
if ! t.tlsEnable {
listener, err := net.Listen("tcp", t.address)
if err != nil {
return fmt.Errorf("failed to configure tcp listener: %w", err)
}
t.listener = listener
return nil
}

config := tls.Config{Certificates: []tls.Certificate{t.tlsKeyPair}}
config.Time = func() time.Time { return time.Now() }
config.Rand = rand.Reader

listener, err := tls.Listen("tcp", t.address, &config)
if err != nil {
return fmt.Errorf("failed to configure tls listener: %w", err)
}

t.listener = listener
return nil
}

// goListenn will listen for tcp connections.
func (t *TCPInput) goListen(ctx context.Context) {
t.wg.Add(1)
Expand All @@ -99,7 +157,7 @@ func (t *TCPInput) goListen(ctx context.Context) {
defer t.wg.Done()

for {
conn, err := t.listener.AcceptTCP()
conn, err := t.listener.Accept()
if err != nil {
select {
case <-ctx.Done():
Expand Down
126 changes: 126 additions & 0 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package tcp

import (
"os"
"net"
"testing"
"time"
"crypto/tls"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
Expand All @@ -26,6 +28,58 @@ import (
"github.com/stretchr/testify/require"
)

const testTLSPrivateKey = `
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDdNdVRHDoOlwrQ
YNlzP6MdLEIvN03Pv3A/Cdyy8LgKgSEf3kmw8o/75tSQzIAR6v7ts/qq1iAwE3OL
s4r8lASj2wirF2fNxX12OvIP8g3mrs4tCANBh413IywVKcEOrry71/s1k7+hscMv
Fe3NLxD1mNKJogwKyifvSc15zx8ge8SLjp875NiLCni2YYWXBt1pqd4wCol8lX6v
3u2rbNXrQf2sLncD0CE45EWHnzLzK33a0BwxyTXAOdd9kindL2IFct9C2HRQEk5h
GaXbNN0f6EMOZOzadJHfMledKVJ1XOd+t/kaPzY4NLDaGad04pNa+jph54qIVL5b
gCTOivX1AgMBAAECggEBAKPll/hxrn5S4LtFlrdyJfueaCctlaRgFd1PBEs8WU/H
HvDKtNS6031zKHlkW1trPpiF6iqbXdvg/ZI7Y7YCQXHZ/pEtVUa7lVp9EA5KbIxH
ZhEtR6RMt77Wu3mupxCm3MVcoA6xOqGl4JTJbZjBz5H4Ob2p57wyzeXYS7p9gHWC
fSj8tEqJdjLt7lqtqaWg/3iqqnLPdT3fGL6uyVbCDn9VZ23C7+sHiUfG67xHiF97
UT+O+dfADMY6rLY1njxdD0QGPS7MQLHAgL/ESjROSL4cj1f9VYJFgweAE/UxnDVQ
n3pTzHFItjYWtK75o7Yc/zaHKp5hsXMsiVb9gtmBcaECgYEA+i2viVdZQqItIDiJ
rc7M42Fo6mLv1gToOVaIst7qPmW6BlwSQbX/x2V/2UsMWtcL95mrmRVjK9iH/Pg8
ZaMlJynpgTM/x0jlZ2gZW1DPJWiCJ97xsdbOBA4JiGExc7odkbZhecfdlf66h0N6
Ll32k80PNqTDJV8wWuUxsEnJaLkCgYEA4luVgtnhiJx3FIfBM9p/EVearFsQFSil
PPeoJfc5GMGAnNeGBv5YI4wZ5Jaa0qHLg5ps5Y8vO1yWKiAuhgVKXhytOj86XsoL
MdisDYcxzskG/9ipX3fP1rBNgwdzBoP4QcpzV69weDsja8AU2pluKSd3r3nzwqsY
dc/NVJRsYR0CgYAw2scSrOoTZxQk3KWWOXItXRJd4yAuzRqER++97mYT9U2UfFpc
VqwyRhHnXw50ltYRbgLijBinsUstDVTODEPvF/IvdtCXnBagUOXSvT8WcQgpvRG5
xtbIV+1oooJDtS6dC96RJ4SQDARk8bpkX5kNV9gGtboeDC6nMWa4pFAekQKBgQCm
naM/3gEU/ZbplcOw13QQ39sKYz1DVdfLOMCcsY1lm4l/6WTOYQmfoNCuYe00fcO/
6zuc/fhWSaB/AZE9NUe4XoNkDIZ6n13+Iu8CRjFzdKWiTWjezOI/tSZY/HK+qQVj
6BFeydSPq3g3J/wxrB5aTKLcl3fGIwquLXeGenoMQQKBgQCWULypEeQwJsyKB57P
JzuCnFMvLL5qSNwot5c7I+AX5yi368dEurQl6pUUJ9VKNbpsUxFIMq9AHpddDoq/
+nIVt1DYr55ZsUJ6SgYtjvCMT9WOE/1Kqfh6p6y/mgRUl8m6v6gqi5/RfsNWJwfl
iBXhcGCQfkwZ8YIUyTW89qrwMw==
-----END PRIVATE KEY-----`

const testTLSCertificate = `
-----BEGIN CERTIFICATE-----
MIIDVDCCAjwCCQCwsE+LGRRtBTANBgkqhkiG9w0BAQsFADBsMQswCQYDVQQGEwJV
UzERMA8GA1UECAwITWljaGlnYW4xFTATBgNVBAcMDEdyYW5kIFJhcGlkczERMA8G
A1UECgwIb2JzZXJ2aVExDzANBgNVBAsMBlN0YW56YTEPMA0GA1UEAwwGU3Rhbnph
MB4XDTIxMDIyNTE3MzgxM1oXDTQ4MDcxMjE3MzgxM1owbDELMAkGA1UEBhMCVVMx
ETAPBgNVBAgMCE1pY2hpZ2FuMRUwEwYDVQQHDAxHcmFuZCBSYXBpZHMxETAPBgNV
BAoMCG9ic2VydmlRMQ8wDQYDVQQLDAZTdGFuemExDzANBgNVBAMMBlN0YW56YTCC
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAN011VEcOg6XCtBg2XM/ox0s
Qi83Tc+/cD8J3LLwuAqBIR/eSbDyj/vm1JDMgBHq/u2z+qrWIDATc4uzivyUBKPb
CKsXZ83FfXY68g/yDeauzi0IA0GHjXcjLBUpwQ6uvLvX+zWTv6Gxwy8V7c0vEPWY
0omiDArKJ+9JzXnPHyB7xIuOnzvk2IsKeLZhhZcG3Wmp3jAKiXyVfq/e7ats1etB
/awudwPQITjkRYefMvMrfdrQHDHJNcA5132SKd0vYgVy30LYdFASTmEZpds03R/o
Qw5k7Np0kd8yV50pUnVc5363+Ro/Njg0sNoZp3Tik1r6OmHniohUvluAJM6K9fUC
AwEAATANBgkqhkiG9w0BAQsFAAOCAQEA0u061goAXX7RxtdRO7Twz4zZIGS/oWvn
gj61zZIXt8LaTzRZFU9rs0rp7jPXKaszArJQc29anf1mWtRwQBAY0S0m4DkwoBln
7hMFf9MlisQvBVFjWgDo7QCJJmAxaPc1NZi8GQIANEMMZ+hLK17dhDB+6SdBbV4R
yx+7I3zcXQ+0H4Aym6KmvoIR3QAXsOYJ/43QzlYU63ryGYBAeg+JiD8fnr2W3QHb
BBdatHmcazlytT5KV+bANT/Ermw8y2tpWGWxMxQHveFh1zThYL8vkLi4fmZqqVCI
zv9WEy+9p05Aet+12x3dzRu93+yRIEYbSZ35NOUWfQ+gspF5rGgpxA==
-----END CERTIFICATE-----`

func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewTCPInputConfig("test_id")
Expand Down Expand Up @@ -73,11 +127,83 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {

f, err := os.Create("test.crt")
require.NoError(t, err)
defer f.Close()
defer os.Remove("test.crt")
_, err = f.WriteString(testTLSCertificate + "\n")
require.NoError(t, err)
f.Close()

f, err = os.Create("test.key")
require.NoError(t, err)
defer f.Close()
defer os.Remove("test.key")
_, err = f.WriteString(testTLSPrivateKey + "\n")
require.NoError(t, err)
f.Close()

cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
cfg.TLS.Enable = true
cfg.TLS.Certificate = "test.crt"
cfg.TLS.PrivateKey = "test.key"

ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]

mockOutput := testutil.Operator{}
tcpInput := op.(*TCPInput)
tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

err = tcpInput.Start()
require.NoError(t, err)
defer tcpInput.Stop()

conn, err := tls.Dial("tcp", tcpInput.listener.Addr().String(), &tls.Config{InsecureSkipVerify: true})
require.NoError(t, err)
defer conn.Close()

_, err = conn.Write(input)
require.NoError(t, err)

for _, expectedMessage := range expected {
select {
case entry := <-entryChan:
require.Equal(t, expectedMessage, entry.Record)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for message to be written")
}
}

select {
case entry := <-entryChan:
require.FailNow(t, "Unexpected entry: %s", entry)
case <-time.After(100 * time.Millisecond):
return
}
}
}

func TestTcpInput(t *testing.T) {
t.Run("Simple", tcpInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))
}

func TestTLSTcpInput(t *testing.T) {
t.Run("Simple", tlsTCPInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"}))
}

func BenchmarkTcpInput(b *testing.B) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
Expand Down