Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTTP server output #19

Merged
merged 6 commits into from
May 28, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

- Added HTTP Server output. [#19](https://github.com/elastic/stream/pull/19)

## [0.3.0]

- Added `--rate-limit` flag to control rate (in bytes/sec) of UDP streaming. [#12](https://github.com/elastic/stream/pull/12)
Expand Down
11 changes: 4 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
LICENSE := ASL2-Short
VERSION ?= latest
VERSION ?= local

check-fmt: goimports go-licenser
@go-licenser -d -license ${LICENSE}
@goimports -l -e -local github.com/andrewkroh . | read && echo "Code differs from gofmt's style. Run 'gofmt -w .'" 1>&2 && exit 1 || true
@goimports -l -e -local github.com/elastic . | read && echo "Code differs from gofmt's style. Run 'gofmt -w .'" 1>&2 && exit 1 || true

docker:
docker build -t akroh/stream:${VERSION} .

publish: docker
docker push akroh/stream:${VERSION}
docker build -t docker.elastic.co/observability/stream:${VERSION} .

fmt: goimports go-licenser
go-licenser -license ${LICENSE}
goimports -l -w -local github.com/andrewkroh .
goimports -l -w -local github.com/elastic .

goimports:
GO111MODULE=off go get golang.org/x/tools/cmd/goimports
Expand Down
29 changes: 14 additions & 15 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,23 @@ import (
"os"
"os/signal"
"strings"
"time"

"github.com/elastic/go-concert/ctxtool/osctx"
"github.com/elastic/go-concert/timed"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/unix"

"github.com/elastic/go-concert/ctxtool/osctx"
"github.com/elastic/go-concert/timed"

"github.com/elastic/stream/pkg/log"
"github.com/elastic/stream/pkg/output"

// Register outputs.
marc-gr marked this conversation as resolved.
Show resolved Hide resolved
_ "github.com/elastic/stream/pkg/output/gcppubsub"
_ "github.com/elastic/stream/pkg/output/httpserver"
_ "github.com/elastic/stream/pkg/output/tcp"
_ "github.com/elastic/stream/pkg/output/tls"
_ "github.com/elastic/stream/pkg/output/udp"
Expand All @@ -43,7 +46,7 @@ func Execute() error {
}

func ExecuteContext(ctx context.Context) error {
logger, err := logger()
logger, err := log.NewLogger()
if err != nil {
return nil
}
Expand Down Expand Up @@ -72,6 +75,13 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Subscription, "gcppubsub-subscription", "subscription", "GCP Pubsub subscription name")
rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag")

// HTTP output flags.
rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.ReadTimeout, "http-server-read-timeout", 5*time.Second, "HTTP Server read timeout")
rootCmd.PersistentFlags().DurationVar(&opts.HTTPServerOptions.WriteTimeout, "http-server-write-timeout", 5*time.Second, "HTTP Server write timeout")
rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSCertificate, "http-server-tls-cert", "", "Path to the TLS certificate")
rootCmd.PersistentFlags().StringVar(&opts.HTTPServerOptions.TLSKey, "http-server-tls-key", "", "Path to the TLS key file")
rootCmd.PersistentFlags().StringArrayVar(&opts.HTTPServerOptions.ResponseHeaders, "http-server-response-headers", []string{"content-type", "application/json"}, "List of headers key-values")

// Sub-commands.
rootCmd.AddCommand(newLogRunner(&opts, logger))
rootCmd.AddCommand(newPCAPRunner(&opts, logger))
Expand All @@ -91,17 +101,6 @@ func ExecuteContext(ctx context.Context) error {
return rootCmd.ExecuteContext(ctx)
}

func logger() (*zap.Logger, error) {
conf := zap.NewProductionConfig()
conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
log, err := conf.Build()
if err != nil {
return nil, err
}
return log, nil
}

func waitForStartSignal(opts *output.Options, parent context.Context, logger *zap.Logger) error {
if opts.StartSignal == "" {
return nil
Expand Down
21 changes: 21 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

package log

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func NewLogger() (*zap.Logger, error) {
conf := zap.NewProductionConfig()
conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
log, err := conf.Build()
if err != nil {
return nil, err
}
return log, nil
}
134 changes: 134 additions & 0 deletions pkg/output/httpserver/httpserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package httpserver

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

"go.uber.org/zap"

"github.com/elastic/stream/pkg/log"
"github.com/elastic/stream/pkg/output"
)

func init() {
output.Register("http-server", New)
}

type Output struct {
logger *zap.SugaredLogger
opts *output.Options
server *http.Server
logChan chan []byte
ctx context.Context
}

func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("a listen address is required")
}

if !(opts.HTTPServerOptions.TLSCertificate == "" && opts.HTTPServerOptions.TLSKey == "") &&
!(opts.HTTPServerOptions.TLSCertificate != "" && opts.HTTPServerOptions.TLSKey != "") {
return nil, errors.New("both TLS certificate and key files must be defined")
}

if len(opts.HTTPServerOptions.ResponseHeaders)%2 != 0 {
return nil, errors.New("response headers must be a list of pairs")
}

logger, err := log.NewLogger()
if err != nil {
return nil, err
}
slogger := logger.Sugar().With("output", "http-server")

logChan := make(chan []byte)
server := &http.Server{
Addr: opts.Addr,
ReadTimeout: opts.HTTPServerOptions.ReadTimeout,
WriteTimeout: opts.HTTPServerOptions.WriteTimeout,
MaxHeaderBytes: 1 << 20,
Handler: newHandler(opts, logChan, slogger),
}

return &Output{
logger: slogger,
opts: opts,
server: server,
logChan: logChan,
}, nil
}

func (o *Output) DialContext(ctx context.Context) error {
o.ctx = ctx

if o.opts.TLSCertificate != "" && o.opts.TLSKey != "" {
go func() { o.logger.Info(o.server.ListenAndServeTLS(o.opts.TLSCertificate, o.opts.TLSKey)) }()
} else {
go func() { o.logger.Info(o.server.ListenAndServe()) }()
}

return nil
}

func (o *Output) Close() error {
defer close(o.logChan)

o.logger.Infow("shutting down http_server...")

ctx, cancel := context.WithTimeout(o.ctx, time.Second)
defer cancel()

return o.server.Shutdown(ctx)
}

func (o *Output) Write(b []byte) (int, error) {
if o.ctx == nil {
return 0, errors.New("DialContext needs to be called before Write can be used")
}

select {
case <-o.ctx.Done():
o.logger.Infow("the output has been closed")
return 0, nil
case o.logChan <- b:
return len(b), nil
}
}

func newHandler(opts *output.Options, logChan <-chan []byte, logger *zap.SugaredLogger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b := <-logChan

defer r.Body.Close()
logger.Debug(strRequest(r))

for i := 0; i < len(opts.HTTPServerOptions.ResponseHeaders); i += 2 {
w.Header().Add(opts.HTTPServerOptions.ResponseHeaders[i], opts.HTTPServerOptions.ResponseHeaders[i+1])
}

_, _ = w.Write(b)
}
}

func strRequest(r *http.Request) string {
var b strings.Builder
b.WriteString("Request path: ")
b.WriteString(r.URL.String())
b.WriteString(", Request Headers: ")
for k, v := range r.Header {
b.WriteString(fmt.Sprintf("'%s: %s' ", k, v))
}
b.WriteString(", Request Body: ")
body, _ := ioutil.ReadAll(r.Body)
b.Write(body)
return b.String()
}
98 changes: 98 additions & 0 deletions pkg/output/httpserver/httpserver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.

package httpserver

import (
"context"
"io/ioutil"
"net/http"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/stream/pkg/output"
)

func TestHTTPServer(t *testing.T) {
cases := []struct {
description string
opts output.HTTPServerOptions
input []string
expectedOutput []string
expectedHeaders http.Header
}{
{
description: "can get one log per response",
input: []string{"a", "b", "c"},
expectedOutput: []string{"a", "b", "c"},
},
{
description: "returns expected response headers",
opts: output.HTTPServerOptions{
ResponseHeaders: []string{"content-type", "custom"},
},
input: []string{"a"},
expectedOutput: []string{"a"},
expectedHeaders: http.Header{
"Content-Type": []string{"custom"},
},
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.description, func(t *testing.T) {
out, err := New(&output.Options{
Addr: "127.0.0.1:1111",
HTTPServerOptions: tc.opts,
})

require.NoError(t, err)
require.NoError(t, out.DialContext(context.Background()))

for i, in := range tc.input {
var n int
var werr error
var wg sync.WaitGroup

wg.Add(1)
go func(in string) {
defer wg.Done()

timeout := time.NewTimer(time.Second)
defer timeout.Stop()

select {
case <-timeout.C:
default:
n, werr = out.Write([]byte(in))
}
}(in)

resp, err := http.Get("http://127.0.0.1:1111")
require.NoError(t, err)
t.Cleanup(func() { resp.Body.Close() })

wg.Wait()
require.NoError(t, werr)
assert.Equal(t, len(in), n)

body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

assert.Equal(t, tc.expectedOutput[i], string(body))

for h, vs := range tc.expectedHeaders {
assert.EqualValues(t, vs, resp.Header[h])
}
}

require.NoError(t, out.Close())
})
}
}
9 changes: 9 additions & 0 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Options struct {

WebhookOptions
GCPPubsubOptions
HTTPServerOptions
}

type WebhookOptions struct {
Expand All @@ -32,3 +33,11 @@ type GCPPubsubOptions struct {
Subscription string // Subscription name. Will create it if not exists.
Clear bool // Clear will clear all topics and subscriptions before running.
}

type HTTPServerOptions struct {
TLSCertificate string // TLS certificate file path.
TLSKey string // TLS key file path.
ResponseHeaders []string // KV list of response headers.
ReadTimeout time.Duration // HTTP Server read timeout.
WriteTimeout time.Duration // HTTP Server write timeout.
}
3 changes: 2 additions & 1 deletion pkg/output/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import (
"context"
"time"

"github.com/elastic/go-concert/timed"
"go.uber.org/zap"

"github.com/elastic/go-concert/timed"
)

func Initialize(opts *Options, logger *zap.SugaredLogger, ctx context.Context) (Output, error) {
Expand Down