Skip to content

Commit

Permalink
Add TLS support to TCP Input (WIP) (#253)
Browse files Browse the repository at this point in the history
* add tls support to tcp input operator

* declare listener and err

* configure certificate before passing to TCPInput type

* test tls tcp listener by creating tls cert and key during the test

* add tls configuration

* Added TLS support to tcp input operator
  • Loading branch information
Joseph Sirianni authored Feb 25, 2021
1 parent 2e6b0b2 commit ba2b847
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 10 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.13.14] - Unreleased

### Changed
- Added TLS support to tcp input operator [pr253](https://github.com/observIQ/stanza/pull/253)

## [0.13.13] - 2021-02-18

### Added
Expand Down
12 changes: 12 additions & 0 deletions docs/operators/tcp_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,22 @@ The `tcp_input` operator listens for logs on one or more TCP connections. The op
| `id` | `tcp_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `listen_address` | required | A listen address of the form `<ip>:<port>` |
| `tls` | | An optional `TLS` configuration (see the TLS configuration section) |
| `write_to` | $ | The record [field](/docs/types/field.md) written to when creating a new log entry |
| `labels` | {} | A map of `key: value` labels to add to the entry's labels |
| `resource` | {} | A map of `key: value` labels to add to the entry's resource |

#### TLS Configuration

The `tcp_input` operator supports TLS, disabled by default.

| Field | Default | Description |
| --- | --- | --- |
| `enable` | `false` | Boolean value to enable or disable TLS |
| `certificate` | | File path for the X509 certificate chain |
| `private_key` | | File path for the X509 private key |


### Example Configurations

#### Simple
Expand Down
78 changes: 68 additions & 10 deletions operator/builtin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"fmt"
"net"
"sync"
"time"
"crypto/rand"
"crypto/tls"

"github.com/observiq/stanza/operator"
"github.com/observiq/stanza/operator/helper"
Expand All @@ -27,7 +30,20 @@ func NewTCPInputConfig(operatorID string) *TCPInputConfig {
type TCPInputConfig struct {
helper.InputConfig `yaml:",inline"`

ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
ListenAddress string `json:"listen_address,omitempty" yaml:"listen_address,omitempty"`
TLS TLSConfig `json:"tls,omitempty" yaml:"tls,omitempty"`
}

// TLSConfig is the configuration for a TLS listener
type TLSConfig struct {
// Enable forces the user of TLS
Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"`

// Certificate is the file path for the certificate
Certificate string `json:"certificate,omitempty" yaml:"certificate,omitempty"`

// PrivateKey is the file path for the private key
PrivateKey string `json:"private_key,omitempty" yaml:"private_key,omitempty"`
}

// Build will build a tcp input operator.
Expand All @@ -41,42 +57,84 @@ func (c TCPInputConfig) Build(context operator.BuildContext) ([]operator.Operato
return nil, fmt.Errorf("missing required parameter 'listen_address'")
}

address, err := net.ResolveTCPAddr("tcp", c.ListenAddress)
if err != nil {
// validate the input address
if _, err := net.ResolveTCPAddr("tcp", c.ListenAddress); err != nil {
return nil, fmt.Errorf("failed to resolve listen_address: %s", err)
}

cert := tls.Certificate{}
if c.TLS.Enable {
if c.TLS.Certificate == "" {
return nil, fmt.Errorf("missing required parameter 'certificate', required when TLS is enabled")
}

if c.TLS.PrivateKey == "" {
return nil, fmt.Errorf("missing required parameter 'private_key', required when TLS is enabled")
}

c, err := tls.LoadX509KeyPair(c.TLS.Certificate, c.TLS.PrivateKey)
if err != nil {
return nil, fmt.Errorf("failed to load tls certificate: %w", err)
}
cert = c
}

tcpInput := &TCPInput{
InputOperator: inputOperator,
address: address,
address: c.ListenAddress,
tlsEnable: c.TLS.Enable,
tlsKeyPair: cert,
}
return []operator.Operator{tcpInput}, nil
}

// TCPInput is an operator that listens for log entries over tcp.
type TCPInput struct {
helper.InputOperator
address *net.TCPAddr
address string
tlsEnable bool
tlsKeyPair tls.Certificate

listener *net.TCPListener
listener net.Listener
cancel context.CancelFunc
wg sync.WaitGroup
}

// Start will start listening for log entries over tcp.
func (t *TCPInput) Start() error {
listener, err := net.ListenTCP("tcp", t.address)
if err != nil {
if err := t.configureListener(); err != nil {
return fmt.Errorf("failed to listen on interface: %w", err)
}

t.listener = listener
ctx, cancel := context.WithCancel(context.Background())
t.cancel = cancel
t.goListen(ctx)
return nil
}

func (t *TCPInput) configureListener() error {
if ! t.tlsEnable {
listener, err := net.Listen("tcp", t.address)
if err != nil {
return fmt.Errorf("failed to configure tcp listener: %w", err)
}
t.listener = listener
return nil
}

config := tls.Config{Certificates: []tls.Certificate{t.tlsKeyPair}}
config.Time = func() time.Time { return time.Now() }
config.Rand = rand.Reader

listener, err := tls.Listen("tcp", t.address, &config)
if err != nil {
return fmt.Errorf("failed to configure tls listener: %w", err)
}

t.listener = listener
return nil
}

// goListenn will listen for tcp connections.
func (t *TCPInput) goListen(ctx context.Context) {
t.wg.Add(1)
Expand All @@ -85,7 +143,7 @@ func (t *TCPInput) goListen(ctx context.Context) {
defer t.wg.Done()

for {
conn, err := t.listener.AcceptTCP()
conn, err := t.listener.Accept()
if err != nil {
select {
case <-ctx.Done():
Expand Down
132 changes: 132 additions & 0 deletions operator/builtin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package tcp

import (
"os"
"net"
"testing"
"time"
"crypto/tls"

"github.com/observiq/stanza/entry"
"github.com/observiq/stanza/operator"
Expand All @@ -12,6 +14,61 @@ import (
"github.com/stretchr/testify/require"
)

const testTLSPrivateKey = `
-----BEGIN PRIVATE KEY-----
MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDjKFqtAaZ/Uj53
Wk2r0xfH9IQqxCjH9gYFI+Kblf8Jvk2sZrQZbLrdjdHsZz/rLrt1YvvBSZ5LtkzK
P99wEi6goESCL0FmYP6Jkg0AKOrnfs8AGX/6PyQPfHBeK/767YV47ug0wJT2/92U
1K2rHIb154rPLp3l1kZyUqIj9MNphjW1jZ62mo2Jp4HkcivjR/cN8jz5UjQHBvO7
KMbhWDc0GLukxoctw/JmigqIrEFqfGcjANTzQZjwqIdscHkVEY8OLzQn+vIiXEF3
9VqLIDRjp3vExWXyVLcLZ1T0rQ84ICE1rBmeHfkteZlYPnuM92kmz/erXOCoDreY
1nwyp1YbAgMBAAECggEAG7RbQsh1vweP2MyptnAbcWawC+s6shCecVgMFj+4CD8u
h/1Kr+Mj80uNs9Bv6kYb1NhKritFZCSKvwwFO0zsZOjHEj2jM1JXGP44GbHj3HIJ
2xBBHItA4aaiqcmh4oa/hZ1VssFeKbXRF4rM15a2Gx2vP0+HMHXux5iub8Y1YxQh
eieSgLlMB4KXrqoosPCPZwSPrh7OzmapUPCeBFnVGUcy4UCRm3HL8RkSsiGJCRk5
mlevdKCy1YEGAojSAJrQOD6vrIXfoB+W0HUfkMdqvCfOQ00szTG9OTKK+tgBc55P
iI9IyNu6J7JxXC6iF/Z5CuXxiHztWz1drios/zKT2QKBgQD1NKqLtAgo1TsGnopK
3I6OTkCFVwZvw90wE0KhtHFAneIe6E2Q8bS9I7fFFlDVjRe4YvuyMYxoRdAlIp+r
qgl/18GZh2xuYcE081EbXSDdSu0yh5AGvZ5QRZO5DwngJZKbFYYPy/7UPO5m2WdV
Sa3TJMVzU7Fndfg1PVQL24snFQKBgQDtKEvAuHclOz3oGJdZYmmVmf+WTZDzFKS6
ms06kjYvqDxO5MgLJgLdaVnBpRUEttrwjKt+7F058vHk0RNOs5zmNwH13koIs2c6
w93ttBltNanoB9X1BWv8qntuHdjad2qsLdSUf2B7JT4i0FHnb9H+P0+m0qSHQCg5
KAuLriTUbwKBgEuDr64cgJLKsEXml2JcsE5lDPvDhEjxQfInTFLudh5XQScRlam4
tle1Y0gACl7p988iNK95EOuf7G0zT4cXc5t6f7XffeY0lsLO2ECcGp3sEEaKdzGM
PfAsrUTFu93a1F6Mb1/4C/+i0Cy+cVNTwIORBHny4WSicRE8VODd+OnNAoGBAK/7
zvrb584BABdS6Dy0ApW5CSiHtqArGXI/nTtxdDQ5K0eADdH4CvgyTSCdV9N/vUfz
mu88hpGR7l5Vp3YnYq6S8yl4IogCWQAKiIzzsEqSH9rGtcZ0l4WPHLjB/UFgjA/o
km7/dqDrKgi7fYu4NqPsZzbr6JtUyIRhau/j8gCRAoGBAPBptqrwdz39Sx7L1i29
nIEssRVQ8XKJoCwcVCtUDYCRtK6SNkac9I712ShW9MiSkwk2YrVGW/tZbyK/wMd0
cFseuHGPmUhW273or666QdFttgPhvtpy0ttMO9cp0px8SzT6ZNlFWHYtoh07fJWC
Zd4aQ9iUbXs0rMIV+0EMrxRf
-----END PRIVATE KEY-----
`

const testTLSCertificate = `
-----BEGIN CERTIFICATE-----
MIIDVDCCAjwCCQDA9fUVDYKppDANBgkqhkiG9w0BAQsFADBsMQswCQYDVQQGEwJV
UzERMA8GA1UECAwITWljaGlnYW4xFTATBgNVBAcMDEdyYW5kIFJhcGlkczERMA8G
A1UECgwIb2JzZXJ2SVExDzANBgNVBAsMBmdpdGh1YjEPMA0GA1UEAwwGc3Rhbnph
MB4XDTIxMDIyNDE0NTQ0OVoXDTIxMDMyNjE0NTQ0OVowbDELMAkGA1UEBhMCVVMx
ETAPBgNVBAgMCE1pY2hpZ2FuMRUwEwYDVQQHDAxHcmFuZCBSYXBpZHMxETAPBgNV
BAoMCG9ic2VydklRMQ8wDQYDVQQLDAZnaXRodWIxDzANBgNVBAMMBnN0YW56YTCC
ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAOMoWq0Bpn9SPndaTavTF8f0
hCrEKMf2BgUj4puV/wm+TaxmtBlsut2N0exnP+suu3Vi+8FJnku2TMo/33ASLqCg
RIIvQWZg/omSDQAo6ud+zwAZf/o/JA98cF4r/vrthXju6DTAlPb/3ZTUraschvXn
is8uneXWRnJSoiP0w2mGNbWNnraajYmngeRyK+NH9w3yPPlSNAcG87soxuFYNzQY
u6TGhy3D8maKCoisQWp8ZyMA1PNBmPCoh2xweRURjw4vNCf68iJcQXf1WosgNGOn
e8TFZfJUtwtnVPStDzggITWsGZ4d+S15mVg+e4z3aSbP96tc4KgOt5jWfDKnVhsC
AwEAATANBgkqhkiG9w0BAQsFAAOCAQEAJRGMTrn7d4xFmQNzpApSSae3fkxVgV9Y
MytgjowvLV9vYarM0Pc/u64SMcx5z3wfMIkbOtF/dPZDzR3bt26Dr1rGBfx97grG
esKfxurrxdqxMiqTRj8MO7mKPa9NwO0M1BR4T29jnoKVcjy8zSlWO0ROAtZmbM74
ez+cfG6859ZLaFZZwY2H0lE4GzFlmkA1FuoR2biyUzRuCH4hMGrHZeiS8KR5ltn2
C/soJcXCDxtHbbfeDKclyRIIpwsXxGfaWehysMcfZavzJ0ZZioeilwdAZK7PcLY8
Y3YVtmCDXFa0Hy0jPMN4UMSvPmxRbcVpGSoEx2qnfOqHGmjrKcJ1kA==
-----END CERTIFICATE-----`



func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {
cfg := NewTCPInputConfig("test_id")
Expand Down Expand Up @@ -59,11 +116,86 @@ func tcpInputTest(input []byte, expected []string) func(t *testing.T) {
}
}

func tlsTCPInputTest(input []byte, expected []string) func(t *testing.T) {
return func(t *testing.T) {

f, err := os.Create("test.crt")
require.NoError(t, err)
defer f.Close()
defer os.Remove("test.crt")
_, err = f.WriteString(testTLSCertificate + "\n")
require.NoError(t, err)
f.Close()

f, err = os.Create("test.key")
require.NoError(t, err)
defer f.Close()
defer os.Remove("test.key")
_, err = f.WriteString(testTLSPrivateKey + "\n")
require.NoError(t, err)
f.Close()




cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
cfg.TLS.Enable = true
cfg.TLS.Certificate = "test.crt"
cfg.TLS.PrivateKey = "test.key"

ops, err := cfg.Build(testutil.NewBuildContext(t))
require.NoError(t, err)
op := ops[0]

mockOutput := testutil.Operator{}
tcpInput := op.(*TCPInput)
tcpInput.InputOperator.OutputOperators = []operator.Operator{&mockOutput}

entryChan := make(chan *entry.Entry, 1)
mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
entryChan <- args.Get(1).(*entry.Entry)
}).Return(nil)

err = tcpInput.Start()
require.NoError(t, err)
defer tcpInput.Stop()

conn, err := tls.Dial("tcp", tcpInput.listener.Addr().String(), &tls.Config{InsecureSkipVerify: true})
require.NoError(t, err)
defer conn.Close()

_, err = conn.Write(input)
require.NoError(t, err)

for _, expectedMessage := range expected {
select {
case entry := <-entryChan:
require.Equal(t, expectedMessage, entry.Record)
case <-time.After(time.Second):
require.FailNow(t, "Timed out waiting for message to be written")
}
}

select {
case entry := <-entryChan:
require.FailNow(t, "Unexpected entry: %s", entry)
case <-time.After(100 * time.Millisecond):
return
}
}
}

func TestTcpInput(t *testing.T) {
t.Run("Simple", tcpInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tcpInputTest([]byte("message\r\n"), []string{"message"}))
}

func TestTLSTcpInput(t *testing.T) {
t.Run("Simple", tlsTCPInputTest([]byte("message\n"), []string{"message"}))
t.Run("CarriageReturn", tlsTCPInputTest([]byte("message\r\n"), []string{"message"}))
}

func BenchmarkTcpInput(b *testing.B) {
cfg := NewTCPInputConfig("test_id")
cfg.ListenAddress = ":0"
Expand Down

0 comments on commit ba2b847

Please sign in to comment.