diff --git a/CHANGELOG.md b/CHANGELOG.md index a4ce9b4..39cf89d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +- Added HTTP Server output. [#19](https://github.com/elastic/stream/pull/19) + ## [0.3.0] - Added `--rate-limit` flag to control rate (in bytes/sec) of UDP streaming. [#12](https://github.com/elastic/stream/pull/12) diff --git a/Makefile b/Makefile index 387a1bb..ba5883a 100644 --- a/Makefile +++ b/Makefile @@ -1,19 +1,16 @@ LICENSE := ASL2-Short -VERSION ?= latest +VERSION ?= local check-fmt: goimports go-licenser @go-licenser -d -license ${LICENSE} - @goimports -l -e -local github.com/andrewkroh . | read && echo "Code differs from gofmt's style. Run 'gofmt -w .'" 1>&2 && exit 1 || true + @goimports -l -e -local github.com/elastic . | read && echo "Code differs from gofmt's style. Run 'gofmt -w .'" 1>&2 && exit 1 || true docker: - docker build -t akroh/stream:${VERSION} . - -publish: docker - docker push akroh/stream:${VERSION} + docker build -t docker.elastic.co/observability/stream:${VERSION} . fmt: goimports go-licenser go-licenser -license ${LICENSE} - goimports -l -w -local github.com/andrewkroh . + goimports -l -w -local github.com/elastic . goimports: GO111MODULE=off go get golang.org/x/tools/cmd/goimports diff --git a/README.md b/README.md index ed32f32..819c505 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,11 @@ [![Build Status](https://beats-ci.elastic.co/job/Library/job/stream-mbp/job/main/badge/icon)](https://beats-ci.elastic.co/job/Library/job/stream-mbp/job/main/) -stream is a test utility for streaming data via udp/tcp/tls/webhook/GCP Pub-Sub. +stream is a test utility for streaming data via: + +- UDP +- TCP +- TLS +- Webhook +- GCP Pub-Sub +- HTTP Server diff --git a/command/root.go b/command/root.go index 3d8b509..253b668 100644 --- a/command/root.go +++ b/command/root.go @@ -10,20 +10,23 @@ import ( "os" "os/signal" "strings" + "time" - "github.com/elastic/go-concert/ctxtool/osctx" - "github.com/elastic/go-concert/timed" "github.com/spf13/cobra" "github.com/spf13/pflag" "go.uber.org/multierr" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/sys/unix" + "github.com/elastic/go-concert/ctxtool/osctx" + "github.com/elastic/go-concert/timed" + + "github.com/elastic/stream/pkg/log" "github.com/elastic/stream/pkg/output" // Register outputs. _ "github.com/elastic/stream/pkg/output/gcppubsub" + _ "github.com/elastic/stream/pkg/output/httpserver" _ "github.com/elastic/stream/pkg/output/tcp" _ "github.com/elastic/stream/pkg/output/tls" _ "github.com/elastic/stream/pkg/output/udp" @@ -43,7 +46,7 @@ func Execute() error { } func ExecuteContext(ctx context.Context) error { - logger, err := logger() + logger, err := log.NewLogger() if err != nil { return nil } @@ -72,6 +75,13 @@ func ExecuteContext(ctx context.Context) error { rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Subscription, "gcppubsub-subscription", "subscription", "GCP Pubsub subscription name") rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag") + // HTTP output flags. + rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.ReadTimeout, "http-server-read-timeout", 5*time.Second, "HTTP Server read timeout") + rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.WriteTimeout, "http-server-write-timeout", 5*time.Second, "HTTP Server write timeout") + rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSCertificate, "http-server-tls-cert", "", "Path to the TLS certificate") + rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSKey, "http-server-tls-key", "", "Path to the TLS key file") + rootCmd.PersistentFlags().StringArrayVar(&opts.HTTPServerOptions.ResponseHeaders, "http-server-response-headers", []string{"content-type", "application/json"}, "List of headers key-values") + // Sub-commands. rootCmd.AddCommand(newLogRunner(&opts, logger)) rootCmd.AddCommand(newPCAPRunner(&opts, logger)) @@ -91,17 +101,6 @@ func ExecuteContext(ctx context.Context) error { return rootCmd.ExecuteContext(ctx) } -func logger() (*zap.Logger, error) { - conf := zap.NewProductionConfig() - conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel) - log, err := conf.Build() - if err != nil { - return nil, err - } - return log, nil -} - func waitForStartSignal(opts *output.Options, parent context.Context, logger *zap.Logger) error { if opts.StartSignal == "" { return nil diff --git a/pkg/log/log.go b/pkg/log/log.go new file mode 100644 index 0000000..409106a --- /dev/null +++ b/pkg/log/log.go @@ -0,0 +1,21 @@ +// Licensed to Elasticsearch B.V. under one or more agreements. +// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +package log + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func NewLogger() (*zap.Logger, error) { + conf := zap.NewProductionConfig() + conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel) + log, err := conf.Build() + if err != nil { + return nil, err + } + return log, nil +} diff --git a/pkg/output/httpserver/httpserver.go b/pkg/output/httpserver/httpserver.go new file mode 100644 index 0000000..de494de --- /dev/null +++ b/pkg/output/httpserver/httpserver.go @@ -0,0 +1,134 @@ +// Licensed to Elasticsearch B.V. under one or more agreements. +// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. +package httpserver + +import ( + "context" + "errors" + "fmt" + "io/ioutil" + "net/http" + "strings" + "time" + + "go.uber.org/zap" + + "github.com/elastic/stream/pkg/log" + "github.com/elastic/stream/pkg/output" +) + +func init() { + output.Register("http-server", New) +} + +type Output struct { + logger *zap.SugaredLogger + opts *output.Options + server *http.Server + logChan chan []byte + ctx context.Context +} + +func New(opts *output.Options) (output.Output, error) { + if opts.Addr == "" { + return nil, errors.New("a listen address is required") + } + + if !(opts.HTTPServerOptions.TLSCertificate == "" && opts.HTTPServerOptions.TLSKey == "") && + !(opts.HTTPServerOptions.TLSCertificate != "" && opts.HTTPServerOptions.TLSKey != "") { + return nil, errors.New("both TLS certificate and key files must be defined") + } + + if len(opts.HTTPServerOptions.ResponseHeaders)%2 != 0 { + return nil, errors.New("response headers must be a list of pairs") + } + + logger, err := log.NewLogger() + if err != nil { + return nil, err + } + slogger := logger.Sugar().With("output", "http-server") + + logChan := make(chan []byte) + server := &http.Server{ + Addr: opts.Addr, + ReadTimeout: opts.HTTPServerOptions.ReadTimeout, + WriteTimeout: opts.HTTPServerOptions.WriteTimeout, + MaxHeaderBytes: 1 << 20, + Handler: newHandler(opts, logChan, slogger), + } + + return &Output{ + logger: slogger, + opts: opts, + server: server, + logChan: logChan, + }, nil +} + +func (o *Output) DialContext(ctx context.Context) error { + o.ctx = ctx + + if o.opts.TLSCertificate != "" && o.opts.TLSKey != "" { + go func() { o.logger.Info(o.server.ListenAndServeTLS(o.opts.TLSCertificate, o.opts.TLSKey)) }() + } else { + go func() { o.logger.Info(o.server.ListenAndServe()) }() + } + + return nil +} + +func (o *Output) Close() error { + defer close(o.logChan) + + o.logger.Infow("shutting down http_server...") + + ctx, cancel := context.WithTimeout(o.ctx, time.Second) + defer cancel() + + return o.server.Shutdown(ctx) +} + +func (o *Output) Write(b []byte) (int, error) { + if o.ctx == nil { + return 0, errors.New("DialContext needs to be called before Write can be used") + } + + select { + case <-o.ctx.Done(): + o.logger.Infow("the output has been closed") + return 0, nil + case o.logChan <- b: + return len(b), nil + } +} + +func newHandler(opts *output.Options, logChan <-chan []byte, logger *zap.SugaredLogger) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + b := <-logChan + + defer r.Body.Close() + logger.Debug(strRequest(r)) + + for i := 0; i < len(opts.HTTPServerOptions.ResponseHeaders); i += 2 { + w.Header().Add(opts.HTTPServerOptions.ResponseHeaders[i], opts.HTTPServerOptions.ResponseHeaders[i+1]) + } + + _, _ = w.Write(b) + } +} + +func strRequest(r *http.Request) string { + var b strings.Builder + b.WriteString("Request path: ") + b.WriteString(r.URL.String()) + b.WriteString(", Request Headers: ") + for k, v := range r.Header { + b.WriteString(fmt.Sprintf("'%s: %s' ", k, v)) + } + b.WriteString(", Request Body: ") + body, _ := ioutil.ReadAll(r.Body) + b.Write(body) + return b.String() +} diff --git a/pkg/output/httpserver/httpserver_test.go b/pkg/output/httpserver/httpserver_test.go new file mode 100644 index 0000000..c3670d1 --- /dev/null +++ b/pkg/output/httpserver/httpserver_test.go @@ -0,0 +1,101 @@ +// Licensed to Elasticsearch B.V. under one or more agreements. +// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information. + +package httpserver + +import ( + "context" + "io/ioutil" + "net/http" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/stream/pkg/output" +) + +func TestHTTPServer(t *testing.T) { + cases := []struct { + description string + opts output.HTTPServerOptions + input []string + expectedOutput []string + expectedHeaders http.Header + }{ + { + description: "can get one log per response", + input: []string{"a", "b", "c"}, + expectedOutput: []string{"a", "b", "c"}, + }, + { + description: "returns expected response headers", + opts: output.HTTPServerOptions{ + ResponseHeaders: []string{"content-type", "custom"}, + }, + input: []string{"a"}, + expectedOutput: []string{"a"}, + expectedHeaders: http.Header{ + "Content-Type": []string{"custom"}, + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.description, func(t *testing.T) { + out, err := New(&output.Options{ + Addr: "127.0.0.1:1111", + HTTPServerOptions: tc.opts, + }) + + require.NoError(t, err) + require.NoError(t, out.DialContext(context.Background())) + + for i, in := range tc.input { + var n int + var werr error + var wg sync.WaitGroup + trigger := make(chan struct{}) + wg.Add(1) + go func(in string) { + defer wg.Done() + + timeout := time.NewTimer(time.Second) + defer timeout.Stop() + + select { + case <-timeout.C: + default: + close(trigger) + n, werr = out.Write([]byte(in)) + } + }(in) + + <-trigger + + resp, err := http.Get("http://127.0.0.1:1111") + require.NoError(t, err) + t.Cleanup(func() { resp.Body.Close() }) + + wg.Wait() + require.NoError(t, werr) + assert.Equal(t, len(in), n) + + body, err := ioutil.ReadAll(resp.Body) + require.NoError(t, err) + + assert.Equal(t, tc.expectedOutput[i], string(body)) + + for h, vs := range tc.expectedHeaders { + assert.EqualValues(t, vs, resp.Header[h]) + } + } + + require.NoError(t, out.Close()) + }) + } +} diff --git a/pkg/output/options.go b/pkg/output/options.go index 7847ddd..53ff185 100644 --- a/pkg/output/options.go +++ b/pkg/output/options.go @@ -17,6 +17,7 @@ type Options struct { WebhookOptions GCPPubsubOptions + HTTPServerOptions } type WebhookOptions struct { @@ -32,3 +33,11 @@ type GCPPubsubOptions struct { Subscription string // Subscription name. Will create it if not exists. Clear bool // Clear will clear all topics and subscriptions before running. } + +type HTTPServerOptions struct { + TLSCertificate string // TLS certificate file path. + TLSKey string // TLS key file path. + ResponseHeaders []string // KV list of response headers. + ReadTimeout time.Duration // HTTP Server read timeout. + WriteTimeout time.Duration // HTTP Server write timeout. +} diff --git a/pkg/output/util.go b/pkg/output/util.go index 4816914..06dfc25 100644 --- a/pkg/output/util.go +++ b/pkg/output/util.go @@ -8,8 +8,9 @@ import ( "context" "time" - "github.com/elastic/go-concert/timed" "go.uber.org/zap" + + "github.com/elastic/go-concert/timed" ) func Initialize(opts *Options, logger *zap.SugaredLogger, ctx context.Context) (Output, error) {