Skip to content

Commit

Permalink
TCP Input: TLS support (#29)
Browse files Browse the repository at this point in the history
* Add support for TLS to the TCP input operator

* Add TLS configurattion to TCP input operator documentation

* Fix module import path typo
  • Loading branch information
Joseph Sirianni authored Feb 25, 2021
1 parent 229c44e commit a2b90b7
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Changed
- Added TLS support to `tcp_input` operator

## [0.15.0] - 2020-02-25

### Added
Expand Down
14 changes: 13 additions & 1 deletion docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,30 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
| `labels` | {} | A map of `key: value` labels to add to the entry's labels |
| `resource` | {} | A map of `key: value` labels to add to the entry's resource |

#### TLS Configuration

The `tcp_input` operator supports TLS, disabled by default.

| Field | Default | Description |
| --- | --- | --- |
| `enable` | `false` | Boolean value to enable or disable TLS |
| `certificate` | | File path for the X509 certificate chain |
| `private_key` | | File path for the X509 private key |


### Example Configurations

#### Simple

Configuration:
```yaml
- type: tcp_input
listen_adress: "0.0.0.0:54525"
listen_address: "0.0.0.0:54525"
```
Send a log:
Expand Down
78 changes: 68 additions & 10 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"fmt"
"net"
"sync"
"time"
"crypto/rand"
"crypto/tls"

"github.com/open-telemetry/opentelemetry-log-collection/operator"
"github.com/open-telemetry/opentelemetry-log-collection/operator/helper"
Expand All @@ -41,7 +44,20 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig {
type TCPInputConfig struct {
helper.InputConfig `yaml:",inline"`

ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"`
}

// TLSConfig is the configuration for a TLS listener
type TLSConfig struct {
// Enable forces the use of TLS
Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"`

// Certificate is the file path for the certificate
Certificate string `json:"certificate,omitempty" yaml:"certificate,omitempty"`

// PrivateKey is the file path for the private key
PrivateKey string `json:"private_key,omitempty" yaml:"private_key,omitempty"`
}

// Build will build a tcp input operator.
Expand All @@ -55,42 +71,84 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, fmt.Errorf("missing required parameter 'listen_address'")
}

address, err := net.ResolveTCPAddr("tcp", c.ListenAddress)
if err != nil {
// validate the input address
if _, err := net.ResolveTCPAddr("tcp", c.ListenAddress); err != nil {
return nil, fmt.Errorf("failed to resolve listen_address: %s", err)
}

cert := tls.Certificate{}
if c.TLS.Enable {
if c.TLS.Certificate == "" {
return nil, fmt.Errorf("missing required parameter 'certificate', required when TLS is enabled")
}

if c.TLS.PrivateKey == "" {
return nil, fmt.Errorf("missing required parameter 'private_key', required when TLS is enabled")
}

c, err := tls.LoadX509KeyPair(c.TLS.Certificate, c.TLS.PrivateKey)
if err != nil {
return nil, fmt.Errorf("failed to load tls certificate: %w", err)
}
cert = c
}

tcpInput := &TCPInput{
InputOperator: inputOperator,
address: address,
address: c.ListenAddress,
tlsEnable: c.TLS.Enable,
tlsKeyPair: cert,
}
return []operator.Operator{tcpInput}, nil
}

// TCPInput is an operator that listens for log entries over tcp.
type TCPInput struct {
helper.InputOperator
address *net.TCPAddr
address string
tlsEnable bool
tlsKeyPair tls.Certificate

listener *net.TCPListener
listener net.Listener
cancel context.CancelFunc
wg sync.WaitGroup
}

// Start will start listening for log entries over tcp.
func (t *TCPInput) Start() error {
listener, err := net.ListenTCP("tcp", t.address)
if err != nil {
if err := t.configureListener(); err != nil {
return fmt.Errorf("failed to listen on interface: %w", err)
}

t.listener = listener
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
t.goListen(ctx)
return nil
}

func (t *TCPInput) configureListener() error {
if ! t.tlsEnable {
listener, err := net.Listen("tcp", t.address)
if err != nil {
return fmt.Errorf("failed to configure tcp listener: %w", err)
}
t.listener = listener
return nil
}

config := tls.Config{Certificates: []tls.Certificate{t.tlsKeyPair}}
config.Time = func() time.Time { return time.Now() }
config.Rand = rand.Reader

listener, err := tls.Listen("tcp", t.address, &config)
if err != nil {
return fmt.Errorf("failed to configure tls listener: %w", err)
}

t.listener = listener
return nil
}

// goListenn will listen for tcp connections.
func (t *TCPInput) goListen(ctx context.Context) {
t.wg.Add(1)
Expand All @@ -99,7 +157,7 @@ func (t *TCPInput) goListen(ctx context.Context) {
defer t.wg.Done()

for {
conn, err := t.listener.AcceptTCP()
conn, err := t.listener.Accept()
if err != nil {
select {
case <-ctx.Done():
Expand Down
126 changes: 126 additions & 0 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package tcp

import (
"os"
"net"
"testing"
"time"
"crypto/tls"

"github.com/open-telemetry/opentelemetry-log-collection/entry"
"github.com/open-telemetry/opentelemetry-log-collection/operator"
Expand All @@ -26,6 +28,58 @@ import (
"github.com/stretchr/testify/require"
)

const testTLSPrivateKey = `
-----BEGIN PRIVATE KEY-----
MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDdNdVRHDoOlwrQ
YNlzP6MdLEIvN03Pv3A/Cdyy8LgKgSEf3kmw8o/75tSQzIAR6v7ts/qq1iAwE3OL
s4r8lASj2wirF2fNxX12OvIP8g3mrs4tCANBh413IywVKcEOrry71/s1k7+hscMv
Fe3NLxD1mNKJogwKyifvSc15zx8ge8SLjp875NiLCni2YYWXBt1pqd4wCol8lX6v
3u2rbNXrQf2sLncD0CE45EWHnzLzK33a0BwxyTXAOdd9kindL2IFct9C2HRQEk5h
GaXbNN0f6EMOZOzadJHfMledKVJ1XOd+t/kaPzY4NLDaGad04pNa+jph54qIVL5b
gCTOivX1AgMBAAECggEBAKPll/hxrn5S4LtFlrdyJfueaCctlaRgFd1PBEs8WU/H
HvDKtNS6031zKHlkW1trPpiF6iqbXdvg/ZI7Y7YCQXHZ/pEtVUa7lVp9EA5KbIxH
ZhEtR6RMt77Wu3mupxCm3MVcoA6xOqGl4JTJbZjBz5H4Ob2p57wyzeXYS7p9gHWC
fSj8tEqJdjLt7lqtqaWg/3iqqnLPdT3fGL6uyVbCDn9VZ23C7+sHiUfG67xHiF97
UT+O+dfADMY6rLY1njxdD0QGPS7MQLHAgL/ESjROSL4cj1f9VYJFgweAE/UxnDVQ
n3pTzHFItjYWtK75o7Yc/zaHKp5hsXMsiVb9gtmBcaECgYEA+i2viVdZQqItIDiJ
rc7M42Fo6mLv1gToOVaIst7qPmW6BlwSQbX/x2V/2UsMWtcL95mrmRVjK9iH/Pg8
ZaMlJynpgTM/x0jlZ2gZW1DPJWiCJ97xsdbOBA4JiGExc7odkbZhecfdlf66h0N6
Ll32k80PNqTDJV8wWuUxsEnJaLkCgYEA4luVgtnhiJx3FIfBM9p/EVearFsQFSil
PPeoJfc5GMGAnNeGBv5YI4wZ5Jaa0qHLg5ps5Y8vO1yWKiAuhgVKXhytOj86XsoL
MdisDYcxzskG/9ipX3fP1rBNgwdzBoP4QcpzV69weDsja8AU2pluKSd3r3nzwqsY
dc/NVJRsYR0CgYAw2scSrOoTZxQk3KWWOXItXRJd4yAuzRqER++97mYT9U2UfFpc
VqwyRhHnXw50ltYRbgLijBinsUstDVTODEPvF/IvdtCXnBagUOXSvT8WcQgpvRG5
xtbIV+1oooJDtS6dC96RJ4SQDARk8bpkX5kNV9gGtboeDC6nMWa4pFAekQKBgQCm
naM/3gEU/ZbplcOw13QQ39sKYz1DVdfLOMCcsY1lm4l/6WTOYQmfoNCuYe00fcO/
6zuc/fhWSaB/AZE9NUe4XoNkDIZ6n13+Iu8CRjFzdKWiTWjezOI/tSZY/HK+qQVj
6BFeydSPq3g3J/wxrB5aTKLcl3fGIwquLXeGenoMQQKBgQCWULypEeQwJsyKB57P
JzuCnFMvLL5qSNwot5c7I+AX5yi368dEurQl6pUUJ9VKNbpsUxFIMq9AHpddDoq/
+nIVt1DYr55ZsUJ6SgYtjvCMT9WOE/1Kqfh6p6y/mgRUl8m6v6gqi5/RfsNWJwfl
iBXhcGCQfkwZ8YIUyTW89qrwMw==
-----END PRIVATE KEY-----`

const testTLSCertificate = `
-----BEGIN CERTIFICATE-----
MIIDVDCCAjwCCQCwsE+LGRRtBTANBgkqhkiG9w0BAQsFADBsMQswCQYDVQQGEwJV
UzERMA8GA1UECAwITWljaGlnYW4xFTATBgNVBAcMDEdyYW5kIFJhcGlkczERMA8G
A1UECgwIb2JzZXJ2aVExDzANBgNVBAsMBlN0YW56YTEPMA0GA1UEAwwGU3Rhbnph
MB4XDTIxMDIyNTE3MzgxM1oXDTQ4MDcxMjE3MzgxM1owbDELMAkGA1UEBhMCVVMx
ETAPBgNVBAgMCE1pY2hpZ2FuMRUwEwYDVQQHDAxHcmFuZCBSYXBpZHMxETAPBgNV
BAoMCG9ic2VydmlRMQ8wDQYDVQQLDAZTdGFuemExDzANBgNVBAMMBlN0YW56YTCC
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAN011VEcOg6XCtBg2XM/ox0s
Qi83Tc+/cD8J3LLwuAqBIR/eSbDyj/vm1JDMgBHq/u2z+qrWIDATc4uzivyUBKPb
CKsXZ83FfXY68g/yDeauzi0IA0GHjXcjLBUpwQ6uvLvX+zWTv6Gxwy8V7c0vEPWY
0omiDArKJ+9JzXnPHyB7xIuOnzvk2IsKeLZhhZcG3Wmp3jAKiXyVfq/e7ats1etB
/awudwPQITjkRYefMvMrfdrQHDHJNcA5132SKd0vYgVy30LYdFASTmEZpds03R/o
Qw5k7Np0kd8yV50pUnVc5363+Ro/Njg0sNoZp3Tik1r6OmHniohUvluAJM6K9fUC
AwEAATANBgkqhkiG9w0BAQsFAAOCAQEA0u061goAXX7RxtdRO7Twz4zZIGS/oWvn
gj61zZIXt8LaTzRZFU9rs0rp7jPXKaszArJQc29anf1mWtRwQBAY0S0m4DkwoBln
7hMFf9MlisQvBVFjWgDo7QCJJmAxaPc1NZi8GQIANEMMZ+hLK17dhDB+6SdBbV4R
yx+7I3zcXQ+0H4Aym6KmvoIR3QAXsOYJ/43QzlYU63ryGYBAeg+JiD8fnr2W3QHb
BBdatHmcazlytT5KV+bANT/Ermw8y2tpWGWxMxQHveFh1zThYL8vkLi4fmZqqVCI
zv9WEy+9p05Aet+12x3dzRu93+yRIEYbSZ35NOUWfQ+gspF5rGgpxA==
-----END CERTIFICATE-----`

func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewTCPInputConfig("test_id")
Expand Down Expand Up @@ -73,11 +127,83 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {

f, err := os.Create("test.crt")
require.NoError(t, err)
defer f.Close()
defer os.Remove("test.crt")
_, err = f.WriteString(testTLSCertificate + "\n")
require.NoError(t, err)
f.Close()

f, err = os.Create("test.key")
require.NoError(t, err)
defer f.Close()
defer os.Remove("test.key")
_, err = f.WriteString(testTLSPrivateKey + "\n")
require.NoError(t, err)
f.Close()

cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
cfg.TLS.Enable = true
cfg.TLS.Certificate = "test.crt"
cfg.TLS.PrivateKey = "test.key"

ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]

mockOutput := testutil.Operator{}
tcpInput := op.(*TCPInput)
tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

err = tcpInput.Start()
require.NoError(t, err)
defer tcpInput.Stop()

conn, err := tls.Dial("tcp", tcpInput.listener.Addr().String(), &tls.Config{InsecureSkipVerify: true})
require.NoError(t, err)
defer conn.Close()

_, err = conn.Write(input)
require.NoError(t, err)

for _, expectedMessage := range expected {
select {
case entry := <-entryChan:
require.Equal(t, expectedMessage, entry.Record)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for message to be written")
}
}

select {
case entry := <-entryChan:
require.FailNow(t, "Unexpected entry: %s", entry)
case <-time.After(100 * time.Millisecond):
return
}
}
}

func TestTcpInput(t *testing.T) {
t.Run("Simple", tcpInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))
}

func TestTLSTcpInput(t *testing.T) {
t.Run("Simple", tlsTCPInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"}))
}

func BenchmarkTcpInput(b *testing.B) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
Expand Down

0 comments on commit a2b90b7

Please sign in to comment.