Skip to content

Commit

Permalink
HTTP output
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-gr committed May 17, 2021
1 parent d013691 commit dd3c409
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ This project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

- Added HTTP Server output. [#19](https://github.com/elastic/stream/pull/19)

## [0.3.0]

- Added `--rate-limit` flag to control rate (in bytes/sec) of UDP streaming. [#12](https://github.com/elastic/stream/pull/12)
Expand Down
23 changes: 10 additions & 13 deletions command/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"os"
"os/signal"
"strings"
"time"

"github.com/elastic/go-concert/ctxtool/osctx"
"github.com/elastic/go-concert/timed"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sys/unix"

"github.com/elastic/stream/pkg/log"
"github.com/elastic/stream/pkg/output"

// Register outputs.
Expand All @@ -43,7 +44,7 @@ func Execute() error {
}

func ExecuteContext(ctx context.Context) error {
logger, err := logger()
logger, err := log.NewLogger()
if err != nil {
return nil
}
Expand Down Expand Up @@ -72,6 +73,13 @@ func ExecuteContext(ctx context.Context) error {
rootCmd.PersistentFlags().StringVar(&opts.GCPPubsubOptions.Subscription, "gcppubsub-subscription", "subscription", "GCP Pubsub subscription name")
rootCmd.PersistentFlags().BoolVar(&opts.GCPPubsubOptions.Clear, "gcppubsub-clear", true, "GCP Pubsub clear flag")

// HTTP output flags.
rootCmd.PersistentFlags().DurationVar(&opts.HTTPSrvOptions.ReadTimeout, "httpsrv-read-timeout", 5*time.Second, "HTTP Server read timeout")
rootCmd.PersistentFlags().DurationVar(&opts.HTTPSrvOptions.WriteTimeout, "httpsrv-write-timeout", 5*time.Second, "HTTP Server write timeout")
rootCmd.PersistentFlags().StringVar(&opts.HTTPSrvOptions.TLSCertificate, "httpsrv-tls-cert", "", "Path to the TLS certificate")
rootCmd.PersistentFlags().StringVar(&opts.HTTPSrvOptions.TLSKey, "httpsrv-tls-key", "", "Path to the TLS key file")
rootCmd.PersistentFlags().StringArrayVar(&opts.HTTPSrvOptions.ResponseHeaders, "httpsrv-response-headers", []string{"content-type", "application/json"}, "List of headers key-values")

// Sub-commands.
rootCmd.AddCommand(newLogRunner(&opts, logger))
rootCmd.AddCommand(newPCAPRunner(&opts, logger))
Expand All @@ -91,17 +99,6 @@ func ExecuteContext(ctx context.Context) error {
return rootCmd.ExecuteContext(ctx)
}

func logger() (*zap.Logger, error) {
conf := zap.NewProductionConfig()
conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
log, err := conf.Build()
if err != nil {
return nil, err
}
return log, nil
}

func waitForStartSignal(opts *output.Options, parent context.Context, logger *zap.Logger) error {
if opts.StartSignal == "" {
return nil
Expand Down
17 changes: 17 additions & 0 deletions pkg/log/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package log

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func NewLogger() (*zap.Logger, error) {
conf := zap.NewProductionConfig()
conf.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
conf.Level = zap.NewAtomicLevelAt(zap.DebugLevel)
log, err := conf.Build()
if err != nil {
return nil, err
}
return log, nil
}
126 changes: 126 additions & 0 deletions pkg/output/http/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to Elasticsearch B.V. under one or more agreements.
// Elasticsearch B.V. licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
package http

import (
"context"
"errors"
"fmt"
"io/ioutil"
"net/http"
"strings"
"time"

"go.uber.org/zap"

"github.com/elastic/stream/pkg/log"
"github.com/elastic/stream/pkg/output"
)

func init() {
output.Register("httpsrv", New)
}

type Output struct {
logger *zap.SugaredLogger
opts *output.Options
server *http.Server
logChan chan []byte
}

func New(opts *output.Options) (output.Output, error) {
if opts.Addr == "" {
return nil, errors.New("a listen address is required")
}

if !(opts.HTTPSrvOptions.TLSCertificate == "" && opts.HTTPSrvOptions.TLSKey == "") &&
!(opts.HTTPSrvOptions.TLSCertificate != "" && opts.HTTPSrvOptions.TLSKey != "") {
return nil, errors.New("both TLS certificate and key files must be defined")
}

if len(opts.HTTPSrvOptions.ResponseHeaders)%2 != 0 {
return nil, errors.New("response headers must be a list of pairs")
}

logger, err := log.NewLogger()
if err != nil {
return nil, err
}
slogger := logger.Sugar().With("output", "httpsrv")

logChan := make(chan []byte)
server := &http.Server{
Addr: opts.Addr,
ReadTimeout: opts.HTTPSrvOptions.ReadTimeout,
WriteTimeout: opts.HTTPSrvOptions.WriteTimeout,
MaxHeaderBytes: 1 << 20,
Handler: newHandler(opts, logChan, slogger),
}

return &Output{
logger: slogger,
opts: opts,
server: server,
logChan: logChan,
}, nil
}

func (o *Output) DialContext(ctx context.Context) error {
if o.opts.TLSCertificate != "" && o.opts.TLSKey != "" {
go func() { o.logger.Info(o.server.ListenAndServeTLS(o.opts.TLSCertificate, o.opts.TLSKey)) }()
} else {
go func() { o.logger.Info(o.server.ListenAndServe()) }()
}
return nil
}

func (o *Output) Close() error {
defer close(o.logChan)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_ = o.server.Shutdown(ctx)
return nil
}

func (o *Output) Write(b []byte) (int, error) {
timer := time.NewTimer(time.Minute)
defer timer.Stop()
select {
case <-timer.C:
return 0, errors.New("waiting to write for too long")
case o.logChan <- b:
return len(b), nil
}
}

func newHandler(opts *output.Options, logChan <-chan []byte, logger *zap.SugaredLogger) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
b := <-logChan

defer r.Body.Close()
logger.Debug(strRequest(r))

for i := 0; i < len(opts.HTTPSrvOptions.ResponseHeaders); i += 2 {
w.Header().Add(opts.HTTPSrvOptions.ResponseHeaders[i], opts.HTTPSrvOptions.ResponseHeaders[i+1])
}

_, _ = w.Write(b)
}
}

func strRequest(r *http.Request) string {
var b strings.Builder
b.WriteString("Request path: ")
b.WriteString(r.URL.String())
b.WriteString(", Request Headers: ")
for k, v := range r.Header {
b.WriteString(fmt.Sprintf("'%s: %s' ", k, v))
}
b.WriteString(", Request Body: ")
body, _ := ioutil.ReadAll(r.Body)
b.Write(body)
return b.String()
}
9 changes: 9 additions & 0 deletions pkg/output/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Options struct {

WebhookOptions
GCPPubsubOptions
HTTPSrvOptions
}

type WebhookOptions struct {
Expand All @@ -32,3 +33,11 @@ type GCPPubsubOptions struct {
Subscription string // Subscription name. Will create it if not exists.
Clear bool // Clear will clear all topics and subscriptions before running.
}

type HTTPSrvOptions struct {
TLSCertificate string // TLS certificate file path.
TLSKey string // TLS key file path.
ResponseHeaders []string // KV list of response headers.
ReadTimeout time.Duration // HTTP Server read timeout.
WriteTimeout time.Duration // HTTP Server write timeout.
}

0 comments on commit dd3c409

Please sign in to comment.