Skip to content

Commit

Permalink
Add HTTP server output (#19)
Browse files Browse the repository at this point in the history
* HTTP output

* Rename output and stop in sigint and sigterm properly

* Add tests

* Various Makefile fixes

* Add http server output to the readme

* Avoid race condition in test
  • Loading branch information
marc-gr authored May 28, 2021
1 parent d013691 commit b890eac
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

- Added HTTP Server output. [#19](https://github.com/elastic/stream/pull/19)

## [0.3.0]

- Added `--rate-limit` flag to control rate (in bytes/sec) of UDP streaming. [#12](https://github.com/elastic/stream/pull/12)
Expand Down
11 changes: 4 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
LICENSE := ASL2-Short
VERSION ?= latest
VERSION ?= local

check-fmt: goimports go-licenser
@go-licenser -d -license ${LICENSE}
@goimports -l -e -local github.com/andrewkroh . | read && echo "Code differs from gofmt's style. Run 'gofmt -w .'" 1>&2 && exit 1 || true
@goimports -l -e -local github.com/elastic . | read && echo "Code differs from gofmt's style. Run 'gofmt -w .'" 1>&2 && exit 1 || true

docker:
docker build -t akroh/stream:${VERSION} .

publish: docker
docker push akroh/stream:${VERSION}
docker build -t docker.elastic.co/observability/stream:${VERSION} .

fmt: goimports go-licenser
go-licenser -license ${LICENSE}
goimports -l -w -local github.com/andrewkroh .
goimports -l -w -local github.com/elastic .

goimports:
GO111MODULE=off go get golang.org/x/tools/cmd/goimports
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,11 @@

[![Build Status](https://beats-ci.elastic.co/job/Library/job/stream-mbp/job/main/badge/icon)](https://beats-ci.elastic.co/job/Library/job/stream-mbp/job/main/)

stream is a test utility for streaming data via udp/tcp/tls/webhook/GCP Pub-Sub.
stream is a test utility for streaming data via:

- UDP
- TCP
- TLS
- Webhook
- GCP Pub-Sub
- HTTP Server
29 changes: 14 additions & 15 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@ import (
"os"
"os/signal"
"strings"
"time"

"github.com/elastic/go-concert/ctxtool/osctx"
"github.com/elastic/go-concert/timed"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/unix"

"github.com/elastic/go-concert/ctxtool/osctx"
"github.com/elastic/go-concert/timed"

"github.com/elastic/stream/pkg/log"
"github.com/elastic/stream/pkg/output"

// Register outputs.
_ "github.com/elastic/stream/pkg/output/gcppubsub"
_ "github.com/elastic/stream/pkg/output/httpserver"
_ "github.com/elastic/stream/pkg/output/tcp"
_ "github.com/elastic/stream/pkg/output/tls"
_ "github.com/elastic/stream/pkg/output/udp"
Expand All @@ -43,7 +46,7 @@ func Execute() error {
}

func ExecuteContext(ctx context.Context) error {
logger, err := logger()
logger, err := log.NewLogger()
if err != nil {
return nil
}
Expand Down Expand Up @@ -72,6 +75,13 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Subscription, "gcppubsub-subscription", "subscription", "GCP Pubsub subscription name")
rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag")

// HTTP output flags.
rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.ReadTimeout, "http-server-read-timeout", 5*time.Second, "HTTP Server read timeout")
rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.WriteTimeout, "http-server-write-timeout", 5*time.Second, "HTTP Server write timeout")
rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSCertificate, "http-server-tls-cert", "", "Path to the TLS certificate")
rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSKey, "http-server-tls-key", "", "Path to the TLS key file")
rootCmd.PersistentFlags().StringArrayVar(&opts.HTTPServerOptions.ResponseHeaders, "http-server-response-headers", []string{"content-type", "application/json"}, "List of headers key-values")

// Sub-commands.
rootCmd.AddCommand(newLogRunner(&opts, logger))
rootCmd.AddCommand(newPCAPRunner(&opts, logger))
Expand All @@ -91,17 +101,6 @@ func ExecuteContext(ctx context.Context) error {
return rootCmd.ExecuteContext(ctx)
}

func logger() (*zap.Logger, error) {
conf := zap.NewProductionConfig()
conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
log, err := conf.Build()
if err != nil {
return nil, err
}
return log, nil
}

func waitForStartSignal(opts *output.Options, parent context.Context, logger *zap.Logger) error {
if opts.StartSignal == "" {
return nil
Expand Down
21 changes: 21 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

package log

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func NewLogger() (*zap.Logger, error) {
conf := zap.NewProductionConfig()
conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
log, err := conf.Build()
if err != nil {
return nil, err
}
return log, nil
}
134 changes: 134 additions & 0 deletions pkg/output/httpserver/httpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package httpserver

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

"go.uber.org/zap"

"github.com/elastic/stream/pkg/log"
"github.com/elastic/stream/pkg/output"
)

func init() {
output.Register("http-server", New)
}

type Output struct {
logger *zap.SugaredLogger
opts *output.Options
server *http.Server
logChan chan []byte
ctx context.Context
}

func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("a listen address is required")
}

if !(opts.HTTPServerOptions.TLSCertificate == "" && opts.HTTPServerOptions.TLSKey == "") &&
!(opts.HTTPServerOptions.TLSCertificate != "" && opts.HTTPServerOptions.TLSKey != "") {
return nil, errors.New("both TLS certificate and key files must be defined")
}

if len(opts.HTTPServerOptions.ResponseHeaders)%2 != 0 {
return nil, errors.New("response headers must be a list of pairs")
}

logger, err := log.NewLogger()
if err != nil {
return nil, err
}
slogger := logger.Sugar().With("output", "http-server")

logChan := make(chan []byte)
server := &http.Server{
Addr: opts.Addr,
ReadTimeout: opts.HTTPServerOptions.ReadTimeout,
WriteTimeout: opts.HTTPServerOptions.WriteTimeout,
MaxHeaderBytes: 1 << 20,
Handler: newHandler(opts, logChan, slogger),
}

return &Output{
logger: slogger,
opts: opts,
server: server,
logChan: logChan,
}, nil
}

func (o *Output) DialContext(ctx context.Context) error {
o.ctx = ctx

if o.opts.TLSCertificate != "" && o.opts.TLSKey != "" {
go func() { o.logger.Info(o.server.ListenAndServeTLS(o.opts.TLSCertificate, o.opts.TLSKey)) }()
} else {
go func() { o.logger.Info(o.server.ListenAndServe()) }()
}

return nil
}

func (o *Output) Close() error {
defer close(o.logChan)

o.logger.Infow("shutting down http_server...")

ctx, cancel := context.WithTimeout(o.ctx, time.Second)
defer cancel()

return o.server.Shutdown(ctx)
}

func (o *Output) Write(b []byte) (int, error) {
if o.ctx == nil {
return 0, errors.New("DialContext needs to be called before Write can be used")
}

select {
case <-o.ctx.Done():
o.logger.Infow("the output has been closed")
return 0, nil
case o.logChan <- b:
return len(b), nil
}
}

func newHandler(opts *output.Options, logChan <-chan []byte, logger *zap.SugaredLogger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b := <-logChan

defer r.Body.Close()
logger.Debug(strRequest(r))

for i := 0; i < len(opts.HTTPServerOptions.ResponseHeaders); i += 2 {
w.Header().Add(opts.HTTPServerOptions.ResponseHeaders[i], opts.HTTPServerOptions.ResponseHeaders[i+1])
}

_, _ = w.Write(b)
}
}

func strRequest(r *http.Request) string {
var b strings.Builder
b.WriteString("Request path: ")
b.WriteString(r.URL.String())
b.WriteString(", Request Headers: ")
for k, v := range r.Header {
b.WriteString(fmt.Sprintf("'%s: %s' ", k, v))
}
b.WriteString(", Request Body: ")
body, _ := ioutil.ReadAll(r.Body)
b.Write(body)
return b.String()
}
101 changes: 101 additions & 0 deletions pkg/output/httpserver/httpserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

package httpserver

import (
"context"
"io/ioutil"
"net/http"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/stream/pkg/output"
)

func TestHTTPServer(t *testing.T) {
cases := []struct {
description string
opts output.HTTPServerOptions
input []string
expectedOutput []string
expectedHeaders http.Header
}{
{
description: "can get one log per response",
input: []string{"a", "b", "c"},
expectedOutput: []string{"a", "b", "c"},
},
{
description: "returns expected response headers",
opts: output.HTTPServerOptions{
ResponseHeaders: []string{"content-type", "custom"},
},
input: []string{"a"},
expectedOutput: []string{"a"},
expectedHeaders: http.Header{
"Content-Type": []string{"custom"},
},
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.description, func(t *testing.T) {
out, err := New(&output.Options{
Addr: "127.0.0.1:1111",
HTTPServerOptions: tc.opts,
})

require.NoError(t, err)
require.NoError(t, out.DialContext(context.Background()))

for i, in := range tc.input {
var n int
var werr error
var wg sync.WaitGroup
trigger := make(chan struct{})
wg.Add(1)
go func(in string) {
defer wg.Done()

timeout := time.NewTimer(time.Second)
defer timeout.Stop()

select {
case <-timeout.C:
default:
close(trigger)
n, werr = out.Write([]byte(in))
}
}(in)

<-trigger

resp, err := http.Get("http://127.0.0.1:1111")
require.NoError(t, err)
t.Cleanup(func() { resp.Body.Close() })

wg.Wait()
require.NoError(t, werr)
assert.Equal(t, len(in), n)

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

assert.Equal(t, tc.expectedOutput[i], string(body))

for h, vs := range tc.expectedHeaders {
assert.EqualValues(t, vs, resp.Header[h])
}
}

require.NoError(t, out.Close())
})
}
}
9 changes: 9 additions & 0 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Options struct {

WebhookOptions
GCPPubsubOptions
HTTPServerOptions
}

type WebhookOptions struct {
Expand All @@ -32,3 +33,11 @@ type GCPPubsubOptions struct {
Subscription string // Subscription name. Will create it if not exists.
Clear bool // Clear will clear all topics and subscriptions before running.
}

type HTTPServerOptions struct {
TLSCertificate string // TLS certificate file path.
TLSKey string // TLS key file path.
ResponseHeaders []string // KV list of response headers.
ReadTimeout time.Duration // HTTP Server read timeout.
WriteTimeout time.Duration // HTTP Server write timeout.
}
Loading

0 comments on commit b890eac

Please sign in to comment.